Skip to content

Commit

Permalink
Add retries in checkIPerf function
Browse files Browse the repository at this point in the history
Add iperf server readiness check

Signed-off-by: Pau Capdevila <[email protected]>
  • Loading branch information
pau-hedgehog committed Jan 27, 2025
1 parent 445d5e7 commit c090a91
Showing 1 changed file with 64 additions and 54 deletions.
118 changes: 64 additions & 54 deletions pkg/hhfab/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -1187,74 +1187,84 @@ func checkIPerf(ctx context.Context, opts TestConnectivityOpts, iperfs *semaphor
if opts.IPerfsSeconds <= 0 || !expected {
return nil
}
maxRetries := 3
var lastErr error
for attempt := 1; attempt <= maxRetries; attempt++ {
if err := iperfs.Acquire(ctx, 1); err != nil {
return fmt.Errorf("acquiring iperf semaphore: %w", err)
}

if err := iperfs.Acquire(ctx, 1); err != nil {
return fmt.Errorf("acquiring iperf semaphore: %w", err)
}
defer iperfs.Release(1)

ctx, cancel := context.WithTimeout(ctx, time.Duration(opts.IPerfsSeconds+30)*time.Second)
defer cancel()
err := func() error {
defer iperfs.Release(1)
attemptCtx, cancel := context.WithTimeout(ctx, time.Duration(opts.IPerfsSeconds+30)*time.Second)
defer cancel()

slog.Debug("Running iperf", "from", from, "to", to)
slog.Debug("Running iperf", "from", from, "to", to, "attempt", attempt)
g, attemptCtx := errgroup.WithContext(attemptCtx)

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
out, err := toSSH.RunContext(attemptCtx, fmt.Sprintf("toolbox -q timeout -v %d iperf3 -s -1", opts.IPerfsSeconds+25))
if err != nil {
return fmt.Errorf("running iperf server: %w: %s", err, string(out))
}
return nil
})

g.Go(func() error {
maxRetries := 3
var lastErr error
for attempt := 1; attempt <= maxRetries; attempt++ {
slog.Debug("Starting iperf3 server", "server", to, "attempt", attempt)
out, err := toSSH.RunContext(ctx, fmt.Sprintf("toolbox -q timeout -v %d iperf3 -s -1", opts.IPerfsSeconds+25))
if err == nil {
slog.Debug("iperf3 server started successfully", "server", to)
g.Go(func() error {
for retries := 0; retries < 10; retries++ {
cmd := fmt.Sprintf("nc -zv %s 5201", toIP.String())
out, err := fromSSH.RunContext(attemptCtx, cmd)
if err == nil {
slog.Debug("iperf3 server is ready", "server", to)
break
}
slog.Debug("iperf3 server not ready, retrying", "server", to, "retry", retries+1, "error", err, "output", string(out))
time.Sleep(1 * time.Second)
}
return nil
}
slog.Warn("iperf3 server failed", "server", to, "attempt", attempt, "error", err)
lastErr = fmt.Errorf("running iperf server: %w: %s", err, string(out))
time.Sleep(2 * time.Second) // Backoff between retries
}
return fmt.Errorf("iperf3 server failed after %d attempts: %w", maxRetries, lastErr)
})
})

g.Go(func() error {
time.Sleep(1 * time.Second) // TODO think about more reliable way to wait for server to start
g.Go(func() error {
cmd := fmt.Sprintf("toolbox -q timeout -v %d iperf3 -P 4 -J -c %s -t %d", opts.IPerfsSeconds+25, toIP.String(), opts.IPerfsSeconds)
slog.Debug("Running iperf3 client", "from", from, "to", to, "cmd", cmd)
out, err := fromSSH.RunContext(attemptCtx, cmd)
if err != nil {
return fmt.Errorf("running iperf client: %w: %s", err, string(out))
}
report, err := parseIPerf3Report(out)
if err != nil {
return fmt.Errorf("parsing iperf report: %w", err)
}

cmd := fmt.Sprintf("toolbox -q timeout -v %d iperf3 -P 4 -J -c %s -t %d", opts.IPerfsSeconds+25, toIP.String(), opts.IPerfsSeconds)
out, err := fromSSH.RunContext(ctx, cmd)
if err != nil {
return fmt.Errorf("running iperf client: %w: %s", err, string(out))
}
slog.Debug("IPerf3 result", "from", from, "to", to,
"sendSpeed", asMbps(report.End.SumSent.BitsPerSecond),
"receiveSpeed", asMbps(report.End.SumReceived.BitsPerSecond),
"sent", asMB(float64(report.End.SumSent.Bytes)),
"received", asMB(float64(report.End.SumReceived.Bytes)))

report, err := parseIPerf3Report(out)
if err != nil {
return fmt.Errorf("parsing iperf report: %w", err)
}
if opts.IPerfsMinSpeed > 0 {
if report.End.SumSent.BitsPerSecond < opts.IPerfsMinSpeed*1_000_000 {
return fmt.Errorf("iperf send speed too low: %s < %s", asMbps(report.End.SumSent.BitsPerSecond), asMbps(opts.IPerfsMinSpeed*1_000_000))
}
if report.End.SumReceived.BitsPerSecond < opts.IPerfsMinSpeed*1_000_000 {
return fmt.Errorf("iperf receive speed too low: %s < %s", asMbps(report.End.SumReceived.BitsPerSecond), asMbps(opts.IPerfsMinSpeed*1_000_000))
}
}
return nil
})

slog.Debug("IPerf3 result", "from", from, "to", to,
"sendSpeed", asMbps(report.End.SumSent.BitsPerSecond),
"receiveSpeed", asMbps(report.End.SumReceived.BitsPerSecond),
"sent", asMB(float64(report.End.SumSent.Bytes)),
"received", asMB(float64(report.End.SumReceived.Bytes)),
)
return g.Wait()
}()

if opts.IPerfsMinSpeed > 0 {
if report.End.SumSent.BitsPerSecond < opts.IPerfsMinSpeed*1_000_000 {
return fmt.Errorf("iperf send speed too low: %s < %s", asMbps(report.End.SumSent.BitsPerSecond), asMbps(opts.IPerfsMinSpeed*1_000_000))
}
if report.End.SumReceived.BitsPerSecond < opts.IPerfsMinSpeed*1_000_000 {
return fmt.Errorf("iperf receive speed too low: %s < %s", asMbps(report.End.SumReceived.BitsPerSecond), asMbps(opts.IPerfsMinSpeed*1_000_000))
}
if err != nil {
slog.Warn("iperf attempt failed", "from", from, "to", to, "attempt", attempt, "error", err)
lastErr = err
continue
}

return nil
})

if err := g.Wait(); err != nil {
return fmt.Errorf("running iperf: %w", err)
}

return nil
return fmt.Errorf("iperf test failed after %d attempts: %w", maxRetries, lastErr)
}

func checkCurl(ctx context.Context, opts TestConnectivityOpts, curls *semaphore.Weighted, from string, fromSSH *goph.Client, toIP string, expected bool) error {
Expand Down

0 comments on commit c090a91

Please sign in to comment.