diff --git a/Dockerfile b/Dockerfile index 66dc8b5..93760fd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # syntax = docker/dockerfile:experimental # the following is updated automatically by make update-build-image-tag -FROM quay.io/travelping/upg-build:10f03c8684150c9d0b492f050ca14d1e AS build-stage +FROM quay.io/travelping/upg-build:5a3ed8c846175982e2e70a5edd3db8d4 AS build-stage ADD vpp /src/vpp ADD upf /src/upf diff --git a/Dockerfile.build b/Dockerfile.build index 48d676c..98d3cfe 100644 --- a/Dockerfile.build +++ b/Dockerfile.build @@ -23,7 +23,7 @@ RUN --mount=target=/var/lib/apt/lists,type=cache,sharing=private \ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ build-essential sudo git netbase curl ca-certificates \ golang-go iproute2 gdb tcpdump iputils-ping libpcap-dev \ - dumb-init && \ + dumb-init iperf3 && \ curl -sSL "https://github.com/moby/buildkit/releases/download/${BUILDKIT_VERSION}/buildkit-${BUILDKIT_VERSION}.linux-amd64.tar.gz" | \ tar -xvz -C /usr/local bin/buildctl && \ echo "${BUILDCTL_SHA256} /usr/local/bin/buildctl" | sha256sum -c && \ diff --git a/Dockerfile.devel b/Dockerfile.devel index 86d05c1..d9bc9f0 100644 --- a/Dockerfile.devel +++ b/Dockerfile.devel @@ -1,6 +1,6 @@ # syntax = docker/dockerfile:experimental # the following is updated automatically by make update-build-image-tag -FROM quay.io/travelping/upg-build:10f03c8684150c9d0b492f050ca14d1e AS build-stage +FROM quay.io/travelping/upg-build:5a3ed8c846175982e2e70a5edd3db8d4 AS build-stage ADD vpp /src/vpp ADD upf /src/upf diff --git a/test/e2e/framework/iperf3.go b/test/e2e/framework/iperf3.go new file mode 100644 index 0000000..7e69ada --- /dev/null +++ b/test/e2e/framework/iperf3.go @@ -0,0 +1,108 @@ +package framework + +import ( + "bytes" + "context" + "encoding/json" + "math" + "net" + "os/exec" + "strconv" + "time" + + "github.com/pkg/errors" + "github.com/travelping/upg-vpp/test/e2e/network" +) + +type IPerf3 struct { + ServerMode bool + Duration time.Duration + NS *network.NetNS + ServerIP net.IP + Reverse bool + cmd *exec.Cmd +} + +type IPerfResult struct { + Error string `json:"error"` + End IPerfResultEnd `json:"end"` +} + +type IPerfResultEnd struct { + SumSent IPerfResultByteStats `json:"sum_sent"` + SumReceived IPerfResultByteStats `json:"sum_received"` +} + +type IPerfResultByteStats struct { + Bytes uint32 `json:"bytes"` +} + +func (ipf *IPerf3) Start() error { + args := []string{ + "--net=" + ipf.NS.Path(), + "iperf3", + "-J", // JSON output + } + + if ipf.ServerMode { + args = append(args, "-s", "-1") // -1 means one-off + } else { + args = append( + args, "-c", ipf.ServerIP.String(), + "-t", strconv.Itoa(int(math.Round(ipf.Duration.Seconds())))) + } + + if ipf.Reverse { + args = append(args, "-R") + } + + ipf.cmd = exec.Command("nsenter", args...) + ipf.cmd.Stdout = &bytes.Buffer{} + ipf.cmd.Stderr = &bytes.Buffer{} + if err := ipf.cmd.Start(); err != nil { + return errors.Wrap(err, "error starting iperf3") + } + + return nil +} + +func (ipf *IPerf3) Kill() { + if !ipf.cmd.ProcessState.Exited() { + ipf.cmd.Process.Kill() + } +} + +func (ipf *IPerf3) Wait(ctx context.Context) (*IPerfResult, error) { + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-ctx.Done(): + ipf.Kill() + case <-doneCh: + } + }() + + // In JSON mode (-J), iperf3 doesn't print anything on stderr, + // but there can also be an error message from nsenter + runErr := ipf.cmd.Wait() + if runErr != nil { + errMsg := ipf.cmd.Stderr.(*bytes.Buffer).Bytes() + if len(errMsg) != 0 { + return nil, errors.Wrapf(runErr, "nsenter/iperf3 failed:\n%s", errMsg) + } + // no error message from stderr, need to parse stdout below + } + + out := ipf.cmd.Stdout.(*bytes.Buffer) + var r IPerfResult + if err := json.Unmarshal(out.Bytes(), &r); err != nil { + return nil, errors.Wrapf(err, "error unmarshalling iperf3 result:\n%s", out.Bytes()) + } + + if runErr != nil { + return nil, errors.Wrapf(runErr, "error running iperf3: %s", r.Error) + } + + return &r, nil +} diff --git a/test/e2e/upg_e2e.go b/test/e2e/upg_e2e.go index 3393822..aa7117a 100644 --- a/test/e2e/upg_e2e.go +++ b/test/e2e/upg_e2e.go @@ -177,6 +177,24 @@ func describeMeasurement(f *framework.Framework) { } verifyNonAppMeasurement(f, ms, proto, nil) }) + + ginkgo.It("works with iperf3", func() { + verifyIPerf3(f, false) + // FIXME: in case of iperf3 run, e2e traffic measurements may + // be imprecise. We might need to wait longer to make sure + // all of the data are sent + // https://github.com/esnet/iperf/issues/994 + // ms = deleteSession(f, seid, true) + // verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil) + }) + + ginkgo.It("works with iperf3 [reverse]", func() { + verifyIPerf3(f, true) + // FIXME: possible imprecise measurement that's not an UPG + // bug, see above + // ms = deleteSession(f, seid, true) + // verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil) + }) }) ginkgo.Context("[ip rules]", func() { @@ -265,6 +283,58 @@ func describeMeasurement(f *framework.Framework) { ginkgo.It("can survive session creation-deletion loop", func() { verifySessionDeletionLoop(f, &seid) }) + + ginkgo.It("works with iperf3", func() { + out, err := f.VPP.Ctl("show upf proxy") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: on")) + + verifyIPerf3(f, false) + // FIXME: possible imprecise measurement that's not an UPG + // bug, see above + // ms = deleteSession(f, seid, true) + // verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil) + }) + + ginkgo.It("works with iperf3 [reverse]", func() { + out, err := f.VPP.Ctl("show upf proxy") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: on")) + + verifyIPerf3(f, true) + // FIXME: possible imprecise measurement that's not an UPG + // bug, see above + // ms = deleteSession(f, seid, true) + // verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil) + }) + + ginkgo.It("works with iperf3 [no force-stitching]", func() { + _, err := f.VPP.Ctl("set upf proxy force-stitching off") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + out, err := f.VPP.Ctl("show upf proxy") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: off")) + + verifyIPerf3(f, false) + // FIXME: possible imprecise measurement that's not an UPG + // bug, see above + // ms = deleteSession(f, seid, true) + // verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil) + }) + + ginkgo.It("works with iperf3 [no force-stitching] [reverse]", func() { + _, err := f.VPP.Ctl("set upf proxy force-stitching off") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + out, err := f.VPP.Ctl("show upf proxy") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: off")) + + verifyIPerf3(f, true) + // FIXME: possible imprecise measurement that's not an UPG + // bug, see above + // ms = deleteSession(f, seid, true) + // verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil) + }) }) sessionContext("[redirects]", framework.SessionConfig{Redirect: true}, func() { @@ -593,7 +663,8 @@ var _ = ginkgo.Describe("[Reporting]", func() { gomega.Expect(string(out)).To(gomega.ContainSubstring("Monitoring Time")) ginkgo.By("Starting some traffic") - tg, clientNS, serverNS := newTrafficGen(f, &traffic.UDPPingConfig{ + clientNS, serverNS := getClientAndServerNamespaces(f) + tg := newTrafficGen(f, &traffic.UDPPingConfig{ PacketCount: 50, // 5s Retry: true, Delay: 100 * time.Millisecond, @@ -668,7 +739,8 @@ var _ = ginkgo.Describe("[Reporting]", func() { gomega.Expect(string(out)).To(gomega.ContainSubstring("Monitoring Time")) ginkgo.By("Starting some traffic") - tg, clientNS, serverNS := newTrafficGen(f, &traffic.UDPPingConfig{ + clientNS, serverNS := getClientAndServerNamespaces(f) + tg := newTrafficGen(f, &traffic.UDPPingConfig{ PacketCount: 180, // 18s Retry: true, Delay: 100 * time.Millisecond, @@ -783,7 +855,8 @@ var _ = ginkgo.Describe("[Reporting]", func() { gomega.Expect(string(out)).To(gomega.ContainSubstring(seidHex)) ginkgo.By("Starting some traffic") - tg, clientNS, serverNS := newTrafficGen(f, &traffic.UDPPingConfig{ + clientNS, serverNS := getClientAndServerNamespaces(f) + tg := newTrafficGen(f, &traffic.UDPPingConfig{ PacketCount: 180, // 30s, but will be stopped when VPP exits Retry: true, Delay: 100 * time.Millisecond, @@ -1227,7 +1300,17 @@ func deleteSessions(f *framework.Framework, seids []pfcp.SEID, showInfo bool) [] return ms } -func newTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) (*traffic.TrafficGen, *network.NetNS, *network.NetNS) { +func getClientAndServerNamespaces(f *framework.Framework) (*network.NetNS, *network.NetNS) { + var serverNS *network.NetNS + if f.Mode == framework.UPGModeGTPProxy { + serverNS = f.VPP.GetNS("srv") + } else { + serverNS = f.VPP.GetNS("sgi") + } + return f.VPP.GetNS("ue"), serverNS +} + +func newTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) *traffic.TrafficGen { ginkgo.By("starting the traffic generator") cfg.SetNoLinger(true) if !cfg.HasServerIP() { @@ -1242,24 +1325,19 @@ func newTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffi cfg.AddServerIP(f.AddServerIP()) } } - clientNS := f.VPP.GetNS("ue") - var serverNS *network.NetNS - if f.Mode == framework.UPGModeGTPProxy { - serverNS = f.VPP.GetNS("srv") - } else { - serverNS = f.VPP.GetNS("sgi") - } - return traffic.NewTrafficGen(cfg, rec), clientNS, serverNS + return traffic.NewTrafficGen(cfg, rec) } func runTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) { - tg, clientNS, serverNS := newTrafficGen(f, cfg, rec) + clientNS, serverNS := getClientAndServerNamespaces(f) + tg := newTrafficGen(f, cfg, rec) framework.ExpectNoError(tg.Run(f.Context, clientNS, serverNS)) } func verifyConnFlood(f *framework.Framework, netem bool) { + clientNS, serverNS := getClientAndServerNamespaces(f) rec := &traffic.SimpleTrafficRec{} - tg, clientNS, serverNS := newTrafficGen(f, &traffic.HTTPConfig{ + tg := newTrafficGen(f, &traffic.HTTPConfig{ Retry: true, SimultaneousCount: 400, // TODO: 5000 works with bigger chunks but takes up too much memory Persist: true, @@ -1305,8 +1383,9 @@ func verifyConnFlood(f *framework.Framework, netem bool) { } // make sure UPG and the session are still alive after the stress test + clientNS, serverNS = getClientAndServerNamespaces(f) rec = &traffic.SimpleTrafficRec{} - tg, clientNS, serverNS = newTrafficGen(f, &traffic.UDPPingConfig{ + tg = newTrafficGen(f, &traffic.UDPPingConfig{ PacketCount: 3, Retry: true, }, rec) @@ -1315,7 +1394,8 @@ func verifyConnFlood(f *framework.Framework, netem bool) { func verifySessionDeletionLoop(f *framework.Framework, seid *pfcp.SEID) { rec := &traffic.SimpleTrafficRec{} - tg, clientNS, serverNS := newTrafficGen(f, &traffic.HTTPConfig{ + clientNS, serverNS := getClientAndServerNamespaces(f) + tg := newTrafficGen(f, &traffic.HTTPConfig{ Retry: true, SimultaneousCount: 400, // TODO: 5000 works with bigger chunks but takes up too much memory Persist: true, @@ -1354,8 +1434,9 @@ LOOP: *seid = startMeasurementSession(f, &framework.SessionConfig{}) } // make sure UPG and the session are still alive after the stress test + clientNS, serverNS = getClientAndServerNamespaces(f) rec = &traffic.SimpleTrafficRec{} - tg, clientNS, serverNS = newTrafficGen(f, &traffic.UDPPingConfig{ + tg = newTrafficGen(f, &traffic.UDPPingConfig{ PacketCount: 3, Retry: true, }, rec) @@ -1363,7 +1444,8 @@ LOOP: } func startTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) chan error { - tg, clientNS, serverNS := newTrafficGen(f, cfg, rec) + clientNS, serverNS := getClientAndServerNamespaces(f) + tg := newTrafficGen(f, cfg, rec) return tg.Start(f.Context, clientNS, serverNS) } @@ -1520,3 +1602,53 @@ func verifyPSDBU(m message.Message, numUsageReports int) { } } } + +func verifyIPerf3(f *framework.Framework, reverse bool) { + clientNS, serverNS := getClientAndServerNamespaces(f) + + serverIPerf3 := &framework.IPerf3{ + ServerMode: true, + NS: serverNS, + } + gomega.Expect(serverIPerf3.Start()).To(gomega.Succeed()) + defer func() { + serverIPerf3.Kill() // does nothing if the process has exited + }() + + clientIPerf3 := &framework.IPerf3{ + ServerMode: false, + Duration: 10 * time.Second, + NS: clientNS, + ServerIP: f.ServerIP(), + Reverse: reverse, + } + gomega.Expect(clientIPerf3.Start()).To(gomega.Succeed()) + defer func() { + clientIPerf3.Kill() + }() + + clientResult, err := clientIPerf3.Wait(f.Context) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + serverResult, err := serverIPerf3.Wait(f.Context) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + framework.Logf("iperf3: %d bytes sent, %d bytes received", + clientResult.End.SumSent.Bytes, + clientResult.End.SumReceived.Bytes) + + gomega.Expect(clientResult.End.SumSent.Bytes). + To(gomega.BeNumerically(">", 50000000), + "low iperf3 transfer volume") + gomega.Expect(clientResult.End.SumReceived.Bytes). + To(gomega.BeNumerically(">", clientResult.End.SumSent.Bytes/2), + "high loss reported by iperf3") + + if reverse { + gomega.Expect(clientResult.End.SumSent.Bytes). + To(gomega.Equal(serverResult.End.SumSent.Bytes)) + } else { + gomega.Expect(clientResult.End.SumReceived.Bytes). + To(gomega.Equal(serverResult.End.SumReceived.Bytes)) + } +} diff --git a/upf/upf_cli.c b/upf/upf_cli.c index e8e835f..7e29d21 100644 --- a/upf/upf_cli.c +++ b/upf/upf_cli.c @@ -1345,6 +1345,16 @@ upf_proxy_set_command_fn (vlib_main_t * vm, unformat_input_t * input, else if (unformat (input, "private-segment-size %U", unformat_memory_size, &private_segment_size)) ; + else if (unformat (input, "force-stitching")) + { + if (unformat (input, "on")) + force_stitching = 1; + else if (unformat (input, "off")) + force_stitching = 0; + else + return clib_error_return (0, "unknown input `%U'", + format_unformat_error, input); + } else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input); @@ -1363,7 +1373,8 @@ VLIB_CLI_COMMAND (upf_proxy_set_command, static) = .short_help = "set upf proxy [mss ] [fifo-size [k|m]]" "[max-fifo-size [k|m]][high-watermark ]" "[low-watermark ][prealloc-fifos ]" - "[private-segment-size ][private-segment-count ]", + "[private-segment-size ][private-segment-count ]" + "[force-stitching ]", .function = upf_proxy_set_command_fn, }; /* *INDENT-ON* */ @@ -1395,14 +1406,16 @@ upf_show_proxy_command_fn (vlib_main_t * vm, "Hi/Lo Watermark: %u %% / %u %%\n" "Prealloc FIFOs: %u\n" "Private Segment Count: %u\n" - "Private Segment Size: %U\n", + "Private Segment Size: %U\n" + "Force stitching: %s\n", pm->mss, format_memory_size, pm->fifo_size, format_memory_size, pm->max_fifo_size, pm->high_watermark, pm->low_watermark, pm->prealloc_fifos, pm->private_segment_count, - format_memory_size, pm->private_segment_size); + format_memory_size, pm->private_segment_size, + pm->force_stitching ? "on" : "off"); done: return error; diff --git a/upf/upf_proxy.c b/upf/upf_proxy.c index 90c1787..81827c8 100644 --- a/upf/upf_proxy.c +++ b/upf/upf_proxy.c @@ -1390,6 +1390,7 @@ upf_proxy_main_init (vlib_main_t * vm) pm->prealloc_fifos = 0; pm->private_segment_count = 0; pm->private_segment_size = 512 << 20; + pm->force_stitching = 1; pm->server_client_index = ~0; pm->active_open_client_index = ~0; diff --git a/upf/upf_proxy.h b/upf/upf_proxy.h index 99ba8cd..fe11a84 100644 --- a/upf/upf_proxy.h +++ b/upf/upf_proxy.h @@ -51,12 +51,13 @@ typedef struct #define foreach_upf_proxy_config_fields \ _(u16, mss) /**< TCP MSS */ \ _(uword, fifo_size) /**< initial fifo size */ \ - _(uword, max_fifo_size) /**< max fifo size */ \ + _(uword, max_fifo_size) /**< max fifo size */ \ _(u8, high_watermark) /**< high watermark (%) */ \ _(u8, low_watermark) /**< low watermark (%) */ \ _(u32, private_segment_count) /**< Number of private fifo segs */ \ - _(uword, private_segment_size) /**< size of private fifo segs */ \ + _(uword, private_segment_size) /**< size of private fifo segs */ \ _(u8, prealloc_fifos) /**< Request fifo preallocation */ \ + _(u8, force_stitching) /**< Force "dirty" proxy stitching */ \ typedef struct { diff --git a/upf/upf_proxy_input.c b/upf/upf_proxy_input.c index 9d83cd2..8db74a7 100644 --- a/upf/upf_proxy_input.c +++ b/upf/upf_proxy_input.c @@ -132,6 +132,7 @@ splice_tcp_connection (upf_main_t * gtm, flow_entry_t * flow, transport_connection_t *tc; tcp_connection_t *tcpRx, *tcpTx; session_t *s; + upf_proxy_main_t *pm = &upf_proxy_main; if (rev->conn_index == ~0) return UPF_PROXY_INPUT_NEXT_TCP_INPUT; @@ -198,24 +199,27 @@ splice_tcp_connection (upf_main_t * gtm, flow_entry_t * flow, return UPF_PROXY_INPUT_NEXT_TCP_INPUT; } - if (flow_seq_offs (flow, origin) == 0) - flow_seq_offs (flow, origin) = direction == FT_ORIGIN ? - tcpTx->snd_nxt - tcpRx->rcv_nxt : tcpRx->rcv_nxt - tcpTx->snd_nxt; - - if (flow_seq_offs (flow, reverse) == 0) - flow_seq_offs (flow, reverse) = direction == FT_ORIGIN ? - tcpTx->rcv_nxt - tcpRx->snd_nxt : tcpRx->snd_nxt - tcpTx->rcv_nxt; - /* check fifo, proxy Tx/Rx are connected... */ if (svm_fifo_max_dequeue (s->rx_fifo) != 0 || svm_fifo_max_dequeue (s->tx_fifo) != 0) { + if (!pm->force_stitching) + return UPF_PROXY_INPUT_NEXT_TCP_INPUT; + flow->spliced_dirty = 1; vlib_increment_simple_counter (>m->upf_simple_counters [UPF_FLOWS_STITCHED_DIRTY_FIFOS], vlib_get_thread_index (), 0, 1); } + if (flow_seq_offs (flow, origin) == 0) + flow_seq_offs (flow, origin) = direction == FT_ORIGIN ? + tcpTx->snd_nxt - tcpRx->rcv_nxt : tcpRx->rcv_nxt - tcpTx->snd_nxt; + + if (flow_seq_offs (flow, reverse) == 0) + flow_seq_offs (flow, reverse) = direction == FT_ORIGIN ? + tcpTx->rcv_nxt - tcpRx->snd_nxt : tcpRx->snd_nxt - tcpTx->rcv_nxt; + /* kill the TCP connections, session and proxy session */ kill_connection_hard (tcpRx); kill_connection_hard (tcpTx);