Skip to content

Commit

Permalink
Fix analysis with errors (#101)
Browse files Browse the repository at this point in the history
Requests with errors have different size, so requests were seen as having multiple sizes.

Size 0 could not be placed in a bucket, so analysis would take hours.

Fixed by excluding size 0 from buckets and furthermore filter out requests with errors.
  • Loading branch information
klauspost authored Apr 27, 2020
1 parent 524283a commit 87b642f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
31 changes: 21 additions & 10 deletions pkg/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package aggregate

import (
"fmt"
"time"

"github.com/minio/warp/pkg/bench"
Expand Down Expand Up @@ -120,6 +121,24 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated
ops = ops.FilterInsideRange(start, end)
}

if errs := ops.FilterErrors(); len(errs) > 0 {
a.Errors = len(errs)
for _, err := range errs {
if len(a.FirstErrors) >= 10 {
break
}
a.FirstErrors = append(a.FirstErrors, fmt.Sprintf("%s, %s: %v", err.Endpoint, err.End.Round(time.Second), err.Err))
}
}

// Remove errored request from further analysis
allOps := ops
ops = ops.FilterSuccessful()
if len(ops) == 0 {
a.Skipped = true
continue
}

segs := ops.Segment(bench.SegmentOptions{
From: time.Time{},
PerSegDuration: segmentDur,
Expand All @@ -142,15 +161,6 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated
a.Concurrency = ops.Threads()
a.Hosts = ops.Hosts()

if errs := ops.Errors(); len(errs) > 0 {
a.Errors = len(errs)
for _, err := range errs {
if len(a.FirstErrors) >= 10 {
break
}
a.FirstErrors = append(a.FirstErrors, err)
}
}
if !ops.MultipleSizes() {
a.SingleSizedRequests = RequestAnalysisSingleSized(ops, !isMixed)
} else {
Expand All @@ -160,7 +170,8 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated
eps := ops.Endpoints()
a.ThroughputByHost = make(map[string]Throughput, len(eps))
for _, ep := range eps {
ops := ops.FilterByEndpoint(ep)
// Use all ops to include errors.
ops := allOps.FilterByEndpoint(ep)
total := ops.Total(false)
var host Throughput
host.fill(total)
Expand Down
21 changes: 19 additions & 2 deletions pkg/bench/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (o Operation) Aggregate(s *Segment) {
if startedInSegment {
s.OpsStarted++
if len(o.Err) != 0 {
// Errors are only counted in segments they started in.
// Errors are only counted in segments they ends in.
return
}

Expand Down Expand Up @@ -481,7 +481,7 @@ func (o Operations) MultipleSizes() bool {
}
sz := o[0].Size
for _, op := range o {
if op.Size != sz {
if len(op.Err) != 0 && op.Size != sz {
return true
}
}
Expand Down Expand Up @@ -612,6 +612,9 @@ func (o Operations) SplitSizes(minShare float64) []SizeSegment {
}
var res []SizeSegment
minSz, maxSz := o.MinMaxSize()
if minSz == 0 {
minSz = 1
}
minLog := int(math.Log10(float64(minSz)))
maxLog := int(math.Log10(float64(maxSz)))
cLog := minLog
Expand Down Expand Up @@ -806,6 +809,20 @@ func (o Operations) Errors() []string {
return errs
}

// FilterSuccessful returns the errors found.
func (o Operations) FilterSuccessful() Operations {
if len(o) == 0 {
return nil
}
ok := make(Operations, 0, len(o))
for _, op := range o {
if len(op.Err) == 0 {
ok = append(ok, op)
}
}
return ok
}

// Errors returns the errors found.
func (o Operations) FilterErrors() Operations {
if len(o) == 0 {
Expand Down

0 comments on commit 87b642f

Please sign in to comment.