diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 2f16c85c..a946a409 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -50,11 +50,5 @@ jobs: - name: Lint run: | - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.51.2 + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin $(go env GOPATH)/bin/golangci-lint run --timeout=5m --config ./.golangci.yml - - - name: Staticcheck - uses: dominikh/staticcheck-action@v1.3.0 - with: - version: "2023.1.2" - install-go: false diff --git a/.golangci.yml b/.golangci.yml index 14786fe2..fd281eeb 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,29 +1,36 @@ linters-settings: - golint: - min-confidence: 0 + gofumpt: + simplify: true misspell: locale: US + staticcheck: + checks: ['all', '-ST1005', '-ST1000', '-SA4000', '-SA9004', '-SA1019', '-SA1008', '-U1000', '-ST1016'] + linters: disable-all: true enable: - - typecheck + - durationcheck + - gocritic + - gofmt + - gofumpt - goimports - - misspell + - gomodguard - govet - - revive - ineffassign - - gosimple + - misspell + - revive + - staticcheck + - tenv + - typecheck + - unconvert - unused - - gofumpt + - prealloc issues: exclude-use-default: false exclude: - - should have a package comment - - comment on exported method - - should have comment or be unexported - - error strings should not be capitalized or end with punctuation or a newline -service: - golangci-lint-version: 1.51.2 # use the fixed version to not introduce new linters unexpectedly + - should have comment or be unexported + - should have a package comment + - error strings should not be capitalized or end with punctuation or a newline diff --git a/api/api.go b/api/api.go index 6ce9933c..a5a0affd 100644 --- a/api/api.go +++ b/api/api.go @@ -41,11 +41,11 @@ type BenchmarkStatus = struct { // Any non-fatal error during the run. Error string `json:"error"` - // Will be true when benchmark has finished and data is ready. - DataReady bool `json:"data_ready"` - // Base filename of the Filename string `json:"filename,omitempty"` + + // Will be true when benchmark has finished and data is ready. + DataReady bool `json:"data_ready"` } // Operations contains raw benchmark operations. @@ -56,22 +56,23 @@ type Operations struct { // Server contains the state of the running server. type Server struct { - status BenchmarkStatus - ops bench.Operations - agrr *aggregate.Aggregated - aggrDur time.Duration - server *http.Server - cmdLine string - // Shutting down ctx context.Context + agrr *aggregate.Aggregated + server *http.Server cancel context.CancelFunc - // lock for Server - mu sync.Mutex // Parent loggers infoln func(data ...interface{}) errorln func(data ...interface{}) + status BenchmarkStatus + cmdLine string + + ops bench.Operations + aggrDur time.Duration + + // lock for Server + mu sync.Mutex } // OperationsReady can be used to send benchmark data to the server. diff --git a/cli/analyze.go b/cli/analyze.go index b1f73247..984c3234 100644 --- a/cli/analyze.go +++ b/cli/analyze.go @@ -452,7 +452,7 @@ func writeSegs(ctx *cli.Context, wrSegs io.Writer, ops bench.Operations, allThre } } -func printRequestAnalysis(ctx *cli.Context, ops aggregate.Operation, details bool) { +func printRequestAnalysis(_ *cli.Context, ops aggregate.Operation, details bool) { console.SetColor("Print", color.New(color.FgHiWhite)) if ops.SingleSizedRequests != nil { diff --git a/cli/benchclient.go b/cli/benchclient.go index b06c5047..c335a6d6 100644 --- a/cli/benchclient.go +++ b/cli/benchclient.go @@ -43,16 +43,16 @@ const ( // clientReply contains the response to a server request. type clientReply struct { - Type clientReplyType `json:"type"` - Time time.Time `json:"time"` - Err string `json:"err,omitempty"` - Ops bench.Operations `json:"ops,omitempty"` + Time time.Time `json:"time"` StageInfo struct { + Custom map[string]string `json:"custom,omitempty"` + Progress float64 `json:"progress"` Started bool `json:"started"` Finished bool `json:"finished"` - Progress float64 `json:"progress"` - Custom map[string]string `json:"custom,omitempty"` } `json:"stage_info"` + Type clientReplyType `json:"type"` + Err string `json:"err,omitempty"` + Ops bench.Operations `json:"ops,omitempty"` } // executeBenchmark will execute the benchmark and return any error. diff --git a/cli/benchmark.go b/cli/benchmark.go index 7885214e..7701b2ad 100644 --- a/cli/benchmark.go +++ b/cli/benchmark.go @@ -278,21 +278,21 @@ var ( ) type clientBenchmark struct { - sync.Mutex ctx context.Context - cancel context.CancelFunc - results bench.Operations err error - stage benchmarkStage + cancel context.CancelFunc info map[benchmarkStage]stageInfo + stage benchmarkStage + results bench.Operations clientIdx int + sync.Mutex } type stageInfo struct { - startRequested bool start chan struct{} done chan struct{} custom map[string]string + startRequested bool } func (c *clientBenchmark) init(ctx context.Context) { @@ -476,7 +476,7 @@ func startProfiling(ctx2 context.Context, ctx *cli.Context) (*runningProfiles, e return &r, nil } -func (rp *runningProfiles) stop(ctx2 context.Context, ctx *cli.Context, fileName string) { +func (rp *runningProfiles) stop(ctx2 context.Context, _ *cli.Context, fileName string) { if rp == nil || rp.client == nil { return } diff --git a/cli/benchserver.go b/cli/benchserver.go index 12bcba93..86936e4c 100644 --- a/cli/benchserver.go +++ b/cli/benchserver.go @@ -74,15 +74,15 @@ func (s serverInfo) validate() error { // serverRequest requests an operation from the client and expects a response. type serverRequest struct { - Operation serverRequestOp `json:"op"` Benchmark struct { + Flags map[string]string `json:"flags"` Command string `json:"command"` Args cli.Args `json:"args"` - Flags map[string]string `json:"flags"` } - Stage benchmarkStage `json:"stage"` - StartTime time.Time `json:"start_time"` - ClientIdx int `json:"client_idx"` + StartTime time.Time `json:"start_time"` + Operation serverRequestOp `json:"op"` + Stage benchmarkStage `json:"stage"` + ClientIdx int `json:"client_idx"` } // runServerBenchmark will run a benchmark server if requested. @@ -236,11 +236,11 @@ func runServerBenchmark(ctx *cli.Context, b bench.Benchmark) (bool, error) { // connections keeps track of connections to clients. type connections struct { + info func(data ...interface{}) + errLn func(data ...interface{}) hosts []string ws []*websocket.Conn si serverInfo - info func(data ...interface{}) - errLn func(data ...interface{}) } // newConnections creates connections (but does not connect) to clients. diff --git a/cli/cli.go b/cli/cli.go index 074b98c0..786b1a5f 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -104,7 +104,7 @@ func init() { mergeCmd, clientCmd, } - appCmds = append(a, b...) + appCmds = append(append(appCmds, a...), b...) benchCmds = a } @@ -265,7 +265,7 @@ func getSystemData() map[string]string { } // Function invoked when invalid command is passed. -func commandNotFound(ctx *cli.Context, command string) { +func commandNotFound(_ *cli.Context, command string) { msg := fmt.Sprintf("`%s` is not a %s command. See `m3 --help`.", command, appName) closestCommands := findClosestCommands(command) if len(closestCommands) > 0 { diff --git a/cli/clientmode.go b/cli/clientmode.go index 926d0a7c..049fd794 100644 --- a/cli/clientmode.go +++ b/cli/clientmode.go @@ -75,5 +75,5 @@ func mainClient(ctx *cli.Context) error { return nil } -func checkClientSyntax(ctx *cli.Context) { +func checkClientSyntax(_ *cli.Context) { } diff --git a/cli/delete.go b/cli/delete.go index 97034507..fce88296 100644 --- a/cli/delete.go +++ b/cli/delete.go @@ -63,17 +63,9 @@ FLAGS: // mainDelete is the entry point for get command. func mainDelete(ctx *cli.Context) error { checkDeleteSyntax(ctx) - src := newGenSource(ctx, "obj.size") b := bench.Delete{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), CreateObjects: ctx.Int("objects"), BatchSize: ctx.Int("batch"), } diff --git a/cli/flags.go b/cli/flags.go index 582c4f5d..790d43a8 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -23,6 +23,8 @@ import ( "github.com/minio/cli" "github.com/minio/pkg/console" + "github.com/minio/warp/pkg/bench" + "github.com/minio/warp/pkg/generator" ) // Collection of warp flags currently supported @@ -227,4 +229,20 @@ var ioFlags = []cli.Flag{ Usage: "enable HTTP2 support if server supports it", Hidden: true, }, + cli.BoolFlag{ + Name: "stress", + Usage: "stress test only and discard output", + }, +} + +func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common { + return bench.Common{ + Client: newClient(ctx), + Concurrency: ctx.Int("concurrent"), + Source: src, + Bucket: ctx.String("bucket"), + Location: ctx.String("region"), + PutOpts: putOpts(ctx), + DiscardOutput: ctx.Bool("stress"), + } } diff --git a/cli/get.go b/cli/get.go index b8ef6188..40ebdbfa 100644 --- a/cli/get.go +++ b/cli/get.go @@ -75,17 +75,9 @@ FLAGS: // mainGet is the entry point for get command. func mainGet(ctx *cli.Context) error { checkGetSyntax(ctx) - src := newGenSource(ctx, "obj.size") sse := newSSE(ctx) b := bench.Get{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), Versions: ctx.Int("versions"), RandomRanges: ctx.Bool("range"), CreateObjects: ctx.Int("objects"), diff --git a/cli/list.go b/cli/list.go index e359ed51..01ccdb62 100644 --- a/cli/list.go +++ b/cli/list.go @@ -66,17 +66,9 @@ FLAGS: // mainDelete is the entry point for get command. func mainList(ctx *cli.Context) error { checkListSyntax(ctx) - src := newGenSource(ctx, "obj.size") b := bench.List{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), Versions: ctx.Int("versions"), Metadata: ctx.Bool("metadata"), CreateObjects: ctx.Int("objects"), diff --git a/cli/merge.go b/cli/merge.go index c54b5872..f6130080 100644 --- a/cli/merge.go +++ b/cli/merge.go @@ -116,5 +116,5 @@ func mainMerge(ctx *cli.Context) error { return nil } -func checkMerge(ctx *cli.Context) { +func checkMerge(_ *cli.Context) { } diff --git a/cli/mixed.go b/cli/mixed.go index d724d67b..302ffe23 100644 --- a/cli/mixed.go +++ b/cli/mixed.go @@ -81,7 +81,6 @@ FLAGS: // mainMixed is the entry point for mixed command. func mainMixed(ctx *cli.Context) error { checkMixedSyntax(ctx) - src := newGenSource(ctx, "obj.size") sse := newSSE(ctx) dist := bench.MixedDistribution{ Distribution: map[string]float64{ @@ -94,14 +93,7 @@ func mainMixed(ctx *cli.Context) error { err := dist.Generate(ctx.Int("objects") * 2) fatalIf(probe.NewError(err), "Invalid distribution") b := bench.Mixed{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), CreateObjects: ctx.Int("objects"), GetOpts: minio.GetObjectOptions{ServerSideEncryption: sse}, StatOpts: minio.StatObjectOptions{ diff --git a/cli/multipart.go b/cli/multipart.go index d66d3198..44882bb9 100644 --- a/cli/multipart.go +++ b/cli/multipart.go @@ -78,21 +78,14 @@ FLAGS: // mainPut is the entry point for cp command. func mainMultipart(ctx *cli.Context) error { checkMultipartSyntax(ctx) - src := newGenSource(ctx, "part.size") b := bench.Multipart{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: multipartOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), ObjName: ctx.String("obj.name"), PartStart: ctx.Int("_part-start"), UploadID: ctx.String("_upload-id"), CreateParts: ctx.Int("parts"), } + b.PutOpts = multipartOpts(ctx) if b.UploadID == "" { err := b.InitOnce(context.Background()) if err != nil { diff --git a/cli/print.go b/cli/print.go index 9bdb4fa5..fcb27da9 100644 --- a/cli/print.go +++ b/cli/print.go @@ -31,17 +31,17 @@ import ( // causeMessage container for golang error messages type causeMessage struct { - Message string `json:"message"` Error error `json:"error"` + Message string `json:"message"` } // errorMessage container for error messages type errorMessage struct { - Message string `json:"message"` Cause causeMessage `json:"cause"` + SysInfo map[string]string `json:"sysinfo"` + Message string `json:"message"` Type string `json:"type"` CallTrace []probe.TracePoint `json:"trace,omitempty"` - SysInfo map[string]string `json:"sysinfo"` } var printMu sync.Mutex @@ -93,8 +93,8 @@ func fatal(err *probe.Error, msg string, data ...interface{}) { errorMsg.CallTrace = err.CallTrace } json, e := json.MarshalIndent(struct { - Status string `json:"status"` Error errorMessage `json:"error"` + Status string `json:"status"` }{ Status: "error", Error: errorMsg, @@ -155,8 +155,8 @@ func errorIf(err *probe.Error, msg string, data ...interface{}) { errorMsg.CallTrace = err.CallTrace } json, e := json.MarshalIndent(struct { - Status string `json:"status"` Error errorMessage `json:"error"` + Status string `json:"status"` }{ Status: "error", Error: errorMsg, diff --git a/cli/progress-bar.go b/cli/progress-bar.go index 26680cfb..02ac9a59 100644 --- a/cli/progress-bar.go +++ b/cli/progress-bar.go @@ -43,7 +43,7 @@ func newProgressBar(total int64, units pb.Units) *progressBar { bar := pb.New64(total) // Set new human friendly print units. - bar.SetUnits(pb.U_DURATION) + bar.SetUnits(units) // Refresh rate for progress bar is set to 125 milliseconds. bar.SetRefreshRate(time.Millisecond * 125) diff --git a/cli/put.go b/cli/put.go index d7051cf3..a25b6c10 100644 --- a/cli/put.go +++ b/cli/put.go @@ -60,16 +60,8 @@ FLAGS: // mainPut is the entry point for cp command. func mainPut(ctx *cli.Context) error { checkPutSyntax(ctx) - src := newGenSource(ctx, "obj.size") b := bench.Put{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), } return runBench(ctx, &b) } diff --git a/cli/retention.go b/cli/retention.go index a66570ea..78ee85de 100644 --- a/cli/retention.go +++ b/cli/retention.go @@ -62,20 +62,12 @@ FLAGS: // mainGet is the entry point for get command. func mainRetention(ctx *cli.Context) error { checkRetentionSyntax(ctx) - src := newGenSource(ctx, "obj.size") b := bench.Retention{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - Locking: true, - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), CreateObjects: ctx.Int("objects"), Versions: ctx.Int("versions"), } + b.Locking = true return runBench(ctx, &b) } diff --git a/cli/select.go b/cli/select.go index 957c4d5f..50064ba0 100644 --- a/cli/select.go +++ b/cli/select.go @@ -1,5 +1,5 @@ /* - * Warp (C) 2020 MinIO, Inc. + * Warp (C) 2023 MinIO, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by @@ -62,17 +62,9 @@ FLAGS: // mainSelect is the entry point for select command. func mainSelect(ctx *cli.Context) error { checkSelectSyntax(ctx) - src := newGenSourceCSV(ctx) sse := newSSE(ctx) b := bench.Select{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSourceCSV(ctx)), CreateObjects: ctx.Int("objects"), SelectOpts: minio.SelectObjectOptions{ Expression: ctx.String("query"), diff --git a/cli/snowball.go b/cli/snowball.go index e0f48ec9..ae67b3a0 100644 --- a/cli/snowball.go +++ b/cli/snowball.go @@ -63,20 +63,13 @@ FLAGS: // mainPut is the entry point for cp command. func mainSnowball(ctx *cli.Context) error { checkSnowballSyntax(ctx) - src := newGenSource(ctx, "obj.size") b := bench.Snowball{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: snowballOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), Compress: ctx.Bool("compress"), Duplicate: ctx.Bool("compress"), NumObjs: ctx.Int("objs.per"), } + b.PutOpts = snowballOpts(ctx) if b.Compress { sz, err := toSize(ctx.String("obj.size")) if err != nil { diff --git a/cli/stat.go b/cli/stat.go index bfb1962a..65a16483 100644 --- a/cli/stat.go +++ b/cli/stat.go @@ -63,18 +63,10 @@ FLAGS: // mainDelete is the entry point for get command. func mainStat(ctx *cli.Context) error { checkStatSyntax(ctx) - src := newGenSource(ctx, "obj.size") sse := newSSE(ctx) b := bench.Stat{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), Versions: ctx.Int("versions"), CreateObjects: ctx.Int("objects"), StatOpts: minio.StatObjectOptions{ diff --git a/cli/versioned.go b/cli/versioned.go index 83f91239..2d0a0138 100644 --- a/cli/versioned.go +++ b/cli/versioned.go @@ -81,7 +81,6 @@ FLAGS: // mainVersioned is the entry point for mixed command. func mainVersioned(ctx *cli.Context) error { checkVersionedSyntax(ctx) - src := newGenSource(ctx, "obj.size") sse := newSSE(ctx) dist := bench.VersionedDistribution{ Distribution: map[string]float64{ @@ -94,14 +93,7 @@ func mainVersioned(ctx *cli.Context) error { err := dist.Generate(ctx.Int("objects") * 2) fatalIf(probe.NewError(err), "Invalid distribution") b := bench.Versioned{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), CreateObjects: ctx.Int("objects"), GetOpts: minio.GetObjectOptions{ServerSideEncryption: sse}, StatOpts: minio.StatObjectOptions{ diff --git a/cli/zip.go b/cli/zip.go index ee65aadf..c8b79fcc 100644 --- a/cli/zip.go +++ b/cli/zip.go @@ -67,20 +67,12 @@ FLAGS: func mainZip(ctx *cli.Context) error { checkZipSyntax(ctx) ctx.Set("noprefix", "true") - src := newGenSource(ctx, "obj.size") b := bench.S3Zip{ - Common: bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: "", - PutOpts: putOpts(ctx), - Locking: true, - }, + Common: getCommon(ctx, newGenSource(ctx, "obj.size")), CreateFiles: ctx.Int("files"), ZipObjName: fmt.Sprintf("%d.zip", time.Now().UnixNano()), } + b.Locking = true return runBench(ctx, &b) } diff --git a/pkg/aggregate/aggregate.go b/pkg/aggregate/aggregate.go index f6381d98..083f2b5c 100644 --- a/pkg/aggregate/aggregate.go +++ b/pkg/aggregate/aggregate.go @@ -27,57 +27,57 @@ import ( // Aggregated contains aggregated data for a single benchmark run. type Aggregated struct { - Type string `json:"type"` - Mixed bool `json:"mixed"` - Operations []Operation `json:"operations,omitempty"` // MixedServerStats and MixedThroughputByHost is populated only when data is mixed. MixedServerStats *Throughput `json:"mixed_server_stats,omitempty"` MixedThroughputByHost map[string]Throughput `json:"mixed_throughput_by_host,omitempty"` + Type string `json:"type"` + Operations []Operation `json:"operations,omitempty"` + Mixed bool `json:"mixed"` } // Operation returns statistics for a single operation type. type Operation struct { - // Operation type - Type string `json:"type"` - // N is the number of operations. - N int `json:"n"` - // Skipped if too little data - Skipped bool `json:"skipped"` + // Throughput information. + Throughput Throughput `json:"throughput"` // Unfiltered start time of this operation segment. StartTime time.Time `json:"start_time"` // Unfiltered end time of this operation segment. EndTime time.Time `json:"end_time"` - // Objects per operation. - ObjectsPerOperation int `json:"objects_per_operation"` - // Concurrency - total number of threads running. - Concurrency int `json:"concurrency"` - // Number of warp clients. - Clients int `json:"clients"` - // Numbers of hosts - Hosts int `json:"hosts"` - // HostNames are sorted names of hosts - HostNames []string `json:"host_names"` - // Populated if requests are all of same object size. - SingleSizedRequests *SingleSizedRequests `json:"single_sized_requests,omitempty"` + // Throughput by host. + ThroughputByHost map[string]Throughput `json:"throughput_by_host"` // Populated if requests are of difference object sizes. MultiSizedRequests *MultiSizedRequests `json:"multi_sized_requests,omitempty"` - // Total errors recorded. - Errors int `json:"errors"` + // Populated if requests are all of same object size. + SingleSizedRequests *SingleSizedRequests `json:"single_sized_requests,omitempty"` + // Operation type + Type string `json:"type"` + // HostNames are sorted names of hosts + HostNames []string `json:"host_names"` // Subset of errors. FirstErrors []string `json:"first_errors"` - // Throughput information. - Throughput Throughput `json:"throughput"` - // Throughput by host. - ThroughputByHost map[string]Throughput `json:"throughput_by_host"` + // Numbers of hosts + Hosts int `json:"hosts"` + // Number of warp clients. + Clients int `json:"clients"` + // Concurrency - total number of threads running. + Concurrency int `json:"concurrency"` + // Total errors recorded. + Errors int `json:"errors"` + // Objects per operation. + ObjectsPerOperation int `json:"objects_per_operation"` + // N is the number of operations. + N int `json:"n"` + // Skipped if too little data + Skipped bool `json:"skipped"` } // SegmentDurFn accepts a total time and should return the duration used for each segment. type SegmentDurFn func(total time.Duration) time.Duration type Options struct { - Prefiltered bool DurFunc SegmentDurFn SkipDur time.Duration + Prefiltered bool } // Aggregate returns statistics when only a single operation was running concurrently. diff --git a/pkg/aggregate/requests.go b/pkg/aggregate/requests.go index 38b0170e..9e285dd2 100644 --- a/pkg/aggregate/requests.go +++ b/pkg/aggregate/requests.go @@ -26,27 +26,26 @@ import ( // SingleSizedRequests contains statistics when all objects have the same size. type SingleSizedRequests struct { - // Skipped if too little data. - Skipped bool `json:"skipped"` + // Request times by host. + ByHost map[string]SingleSizedRequests `json:"by_host,omitempty"` + LastAccess *SingleSizedRequests `json:"last_access,omitempty"` - // Object size per operation. Can be 0. - ObjSize int64 `json:"obj_size"` + // FirstAccess is filled if the same object is accessed multiple times. + // This records the first touch of the object. + FirstAccess *SingleSizedRequests `json:"first_access,omitempty"` - // Total number of requests. - Requests int `json:"requests"` + // Time to first byte if applicable. + FirstByte *TTFB `json:"first_byte,omitempty"` - // Average request duration. - DurAvgMillis int `json:"dur_avg_millis"` + // Host names, sorted. + HostNames []string + + // DurPct is duration percentiles. + DurPct [101]int `json:"dur_percentiles_millis"` // Median request duration. DurMedianMillis int `json:"dur_median_millis"` - // 90% request time. - Dur90Millis int `json:"dur_90_millis"` - - // 99% request time. - Dur99Millis int `json:"dur_99_millis"` - // Fastest request time. FastestMillis int `json:"fastest_millis"` @@ -56,22 +55,23 @@ type SingleSizedRequests struct { // StdDev is the standard deviation of requests. StdDev int `json:"std_dev_millis"` - // DurPct is duration percentiles. - DurPct [101]int `json:"dur_percentiles_millis"` + // 99% request time. + Dur99Millis int `json:"dur_99_millis"` - // Time to first byte if applicable. - FirstByte *TTFB `json:"first_byte,omitempty"` + // 90% request time. + Dur90Millis int `json:"dur_90_millis"` - // FirstAccess is filled if the same object is accessed multiple times. - // This records the first touch of the object. - FirstAccess *SingleSizedRequests `json:"first_access,omitempty"` - LastAccess *SingleSizedRequests `json:"last_access,omitempty"` + // Average request duration. + DurAvgMillis int `json:"dur_avg_millis"` - // Host names, sorted. - HostNames []string + // Total number of requests. + Requests int `json:"requests"` - // Request times by host. - ByHost map[string]SingleSizedRequests `json:"by_host,omitempty"` + // Object size per operation. Can be 0. + ObjSize int64 `json:"obj_size"` + + // Skipped if too little data. + Skipped bool `json:"skipped"` } func (a *SingleSizedRequests) fill(ops bench.Operations) { @@ -106,35 +106,37 @@ func (a *SingleSizedRequests) fillFirstLast(ops bench.Operations) { } type RequestSizeRange struct { - // Number of requests in this range. - Requests int `json:"requests"` - // Minimum size in request size range. - MinSize int `json:"min_size"` + // Time to first byte if applicable. + FirstByte *TTFB `json:"first_byte,omitempty"` + + // FirstAccess is filled if the same object is accessed multiple times. + // This records the first touch of the object. + FirstAccess *RequestSizeRange `json:"first_access,omitempty"` + MinSizeString string `json:"min_size_string"` - // Maximum size in request size range (not included). - MaxSize int `json:"max_size"` MaxSizeString string `json:"max_size_string"` - // Average payload size of requests in bytes. - AvgObjSize int `json:"avg_obj_size"` - AvgDurationMillis int `json:"avg_duration_millis"` + + // BpsPct is BPS percentiles. + BpsPct [101]float64 `json:"bps_percentiles"` + + BpsMedian float64 `json:"bps_median"` + AvgDurationMillis int `json:"avg_duration_millis"` // Stats: BpsAverage float64 `json:"bps_average"` - BpsMedian float64 `json:"bps_median"` + // Number of requests in this range. + Requests int `json:"requests"` Bps90 float64 `json:"bps_90"` Bps99 float64 `json:"bps_99"` BpsFastest float64 `json:"bps_fastest"` BpsSlowest float64 `json:"bps_slowest"` - // BpsPct is BPS percentiles. - BpsPct [101]float64 `json:"bps_percentiles"` - - // FirstAccess is filled if the same object is accessed multiple times. - // This records the first touch of the object. - FirstAccess *RequestSizeRange `json:"first_access,omitempty"` - - // Time to first byte if applicable. - FirstByte *TTFB `json:"first_byte,omitempty"` + // Average payload size of requests in bytes. + AvgObjSize int `json:"avg_obj_size"` + // Maximum size in request size range (not included). + MaxSize int `json:"max_size"` + // Minimum size in request size range. + MinSize int `json:"min_size"` } func (r *RequestSizeRange) fill(s bench.SizeSegment) { @@ -170,12 +172,8 @@ func (r *RequestSizeRange) fillFirst(s bench.SizeSegment) { // MultiSizedRequests contains statistics when objects have the same different size. type MultiSizedRequests struct { - // Skipped if too little data. - Skipped bool `json:"skipped"` - // Total number of requests. - Requests int `json:"requests"` - // Average object size - AvgObjSize int64 `json:"avg_obj_size"` + // ByHost contains request information by host. + ByHost map[string]RequestSizeRange `json:"by_host,omitempty"` // BySize contains request times separated by sizes BySize []RequestSizeRange `json:"by_size"` @@ -183,8 +181,13 @@ type MultiSizedRequests struct { // HostNames are the host names, sorted. HostNames []string - // ByHost contains request information by host. - ByHost map[string]RequestSizeRange `json:"by_host,omitempty"` + // Total number of requests. + Requests int `json:"requests"` + // Average object size + AvgObjSize int64 `json:"avg_obj_size"` + + // Skipped if too little data. + Skipped bool `json:"skipped"` } func (a *MultiSizedRequests) fill(ops bench.Operations) { diff --git a/pkg/aggregate/throughput.go b/pkg/aggregate/throughput.go index d2bb336e..a9a91119 100644 --- a/pkg/aggregate/throughput.go +++ b/pkg/aggregate/throughput.go @@ -27,22 +27,22 @@ import ( // Throughput contains throughput. type Throughput struct { - // Errors recorded. - Errors int `json:"errors"` - // Time period of the throughput measurement. - MeasureDurationMillis int `json:"measure_duration_millis"` // Start time of the measurement. StartTime time.Time `json:"start_time"` // End time of the measurement. EndTime time.Time `json:"end_time"` + // Time segmented throughput summary. + Segmented *ThroughputSegmented `json:"segmented,omitempty"` + // Errors recorded. + Errors int `json:"errors"` + // Time period of the throughput measurement. + MeasureDurationMillis int `json:"measure_duration_millis"` // Average bytes per second. Can be 0. AverageBPS float64 `json:"average_bps"` // Average operations per second. AverageOPS float64 `json:"average_ops"` // Number of full operations Operations int `json:"operations"` - // Time segmented throughput summary. - Segmented *ThroughputSegmented `json:"segmented,omitempty"` } // String returns a string representation of the segment @@ -56,7 +56,7 @@ func (t Throughput) StringDuration() string { } // StringDetails returns a detailed string representation of the segment -func (t Throughput) StringDetails(details bool) string { +func (t Throughput) StringDetails(_ bool) string { speed := "" if t.AverageBPS > 0 { speed = fmt.Sprintf("%.02f MiB/s, ", t.AverageBPS/(1<<20)) @@ -84,8 +84,12 @@ func (t *Throughput) fill(total bench.Segment) { // ThroughputSegmented contains time segmented throughput statics. type ThroughputSegmented struct { - // Time of each segment. - SegmentDurationMillis int `json:"segment_duration_millis"` + // Start time of fastest time segment. + FastestStart time.Time `json:"fastest_start"` + // 50% Median.... + MedianStart time.Time `json:"median_start"` + // Slowest ... + SlowestStart time.Time `json:"slowest_start"` // Will contain how segments are sorted. // Will be 'bps' (bytes per second) or 'ops' (objects per second). SortedBy string `json:"sorted_by"` @@ -93,20 +97,16 @@ type ThroughputSegmented struct { // All segments, sorted Segments []SegmentSmall `json:"segments"` - // Start time of fastest time segment. - FastestStart time.Time `json:"fastest_start"` + // Time of each segment. + SegmentDurationMillis int `json:"segment_duration_millis"` // Fastest segment bytes per second. Can be 0. In that case segments are sorted by operations per second. FastestBPS float64 `json:"fastest_bps"` // Fastest segment in terms of operations per second. FastestOPS float64 `json:"fastest_ops"` - // 50% Median.... - MedianStart time.Time `json:"median_start"` - MedianBPS float64 `json:"median_bps"` - MedianOPS float64 `json:"median_ops"` - // Slowest ... - SlowestStart time.Time `json:"slowest_start"` - SlowestBPS float64 `json:"slowest_bps"` - SlowestOPS float64 `json:"slowest_ops"` + MedianBPS float64 `json:"median_bps"` + MedianOPS float64 `json:"median_ops"` + SlowestBPS float64 `json:"slowest_bps"` + SlowestOPS float64 `json:"slowest_ops"` } // BPSorOPS returns bytes per second if non zero otherwise operations per second as human readable string. @@ -120,6 +120,8 @@ func BPSorOPS(bps, ops float64) string { // SegmentSmall represents a time segment of the run. // Length of the segment is defined elsewhere. type SegmentSmall struct { + // Start time of the segment. + Start time.Time `json:"start"` // Bytes per second during the time segment. BPS float64 `json:"bytes_per_sec"` @@ -128,9 +130,6 @@ type SegmentSmall struct { // Errors logged during the time segment. Errors int `json:"errors,omitempty"` - - // Start time of the segment. - Start time.Time `json:"start"` } // cloneBenchSegments clones benchmark segments to the simpler representation. diff --git a/pkg/bench/analyze.go b/pkg/bench/analyze.go index 90e9ba84..58abc97a 100644 --- a/pkg/bench/analyze.go +++ b/pkg/bench/analyze.go @@ -37,19 +37,19 @@ type SegmentOptions struct { // A Segment represents totals of operations in a specific time segment // starting at Start and ending before EndsBefore. type Segment struct { + EndsBefore time.Time `json:"ends_before"` + Start time.Time `json:"start"` OpType string `json:"op"` Host string `json:"host"` - ObjsPerOp int `json:"objects_per_op"` - TotalBytes int64 `json:"total_bytes"` - FullOps int `json:"full_ops"` - PartialOps int `json:"partial_ops"` OpsStarted int `json:"ops_started"` + PartialOps int `json:"partial_ops"` + FullOps int `json:"full_ops"` OpsEnded int `json:"ops_ended"` Objects float64 `json:"objects"` Errors int `json:"errors"` ReqAvg float64 `json:"req_avg_ms"` // Average duration of operations ending in segment. - Start time.Time `json:"start"` - EndsBefore time.Time `json:"ends_before"` + TotalBytes int64 `json:"total_bytes"` + ObjsPerOp int `json:"objects_per_op"` } // TTFB contains time to first byte stats. diff --git a/pkg/bench/benchmark.go b/pkg/bench/benchmark.go index 2ed4b7a3..c7e79841 100644 --- a/pkg/bench/benchmark.go +++ b/pkg/bench/benchmark.go @@ -47,41 +47,51 @@ type Benchmark interface { // Common contains common benchmark parameters. type Common struct { - Client func() (cl *minio.Client, done func()) - - Concurrency int - Source func() generator.Source - Bucket string - Location string - Locking bool - - // Running in client mode. - ClientMode bool - // Clear bucket before benchmark - Clear bool - PrepareProgress chan float64 - // Does destination support versioning? - Versioned bool - - // Auto termination is set when this is > 0. - AutoTermDur time.Duration - AutoTermScale float64 - // Default Put options. PutOpts minio.PutObjectOptions + PrepareProgress chan float64 + // Custom is returned to server if set by clients. Custom map[string]string + // ExtraFlags contains extra flags to add to remote clients. + ExtraFlags map[string]string + Source func() generator.Source + // Error should log an error similar to fmt.Print(data...) Error func(data ...interface{}) + Client func() (cl *minio.Client, done func()) + + Collector *Collector + + Location string + Bucket string + + // Auto termination is set when this is > 0. + AutoTermDur time.Duration + // ClientIdx is the client index. // Will be 0 if single client. ClientIdx int - // ExtraFlags contains extra flags to add to remote clients. - ExtraFlags map[string]string + AutoTermScale float64 + + Concurrency int + + // Running in client mode. + ClientMode bool + Locking bool + + // Clear bucket before benchmark + Clear bool + + // DiscardOutput output. + DiscardOutput bool // indicates if we prefer a terse output useful in lengthy runs + + // Does destination support versioning? + Versioned bool } const ( @@ -224,3 +234,11 @@ func (c *Common) prepareProgress(progress float64) { default: } } + +func (c *Common) addCollector() { + if c.DiscardOutput { + c.Collector = NewNullCollector() + } else { + c.Collector = NewCollector() + } +} diff --git a/pkg/bench/collector.go b/pkg/bench/collector.go new file mode 100644 index 00000000..e0dcc09a --- /dev/null +++ b/pkg/bench/collector.go @@ -0,0 +1,157 @@ +/* + * Warp (C) 2019-2023 MinIO, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package bench + +import ( + "context" + "math" + "sync" + "time" + + "github.com/minio/pkg/console" +) + +type Collector struct { + rcv chan Operation + ops Operations + rcvWg sync.WaitGroup + // The mutex protects the ops above. + // Once ops have been added, they should no longer be modified. + opsMu sync.Mutex +} + +func NewCollector() *Collector { + r := &Collector{ + ops: make(Operations, 0, 10000), + rcv: make(chan Operation, 1000), + } + r.rcvWg.Add(1) + go func() { + defer r.rcvWg.Done() + for op := range r.rcv { + r.opsMu.Lock() + r.ops = append(r.ops, op) + r.opsMu.Unlock() + } + }() + return r +} + +// NewNullCollector collects operations, but discards them. +func NewNullCollector() *Collector { + r := &Collector{ + ops: make(Operations, 0), + rcv: make(chan Operation, 1000), + } + r.rcvWg.Add(1) + go func() { + defer r.rcvWg.Done() + // https://github.com/mgechev/revive/issues/386 + //nolint:revive + for range r.rcv { + // Just dump them + } + }() + return r +} + +// AutoTerm will check if throughput is within 'threshold' (0 -> ) for wantSamples, +// when the current operations are split into 'splitInto' segments. +// The minimum duration for the calculation can be set as well. +// Segment splitting may cause less than this duration to be used. +func (c *Collector) AutoTerm(ctx context.Context, op string, threshold float64, wantSamples, splitInto int, minDur time.Duration) context.Context { + if wantSamples >= splitInto { + panic("wantSamples >= splitInto") + } + if splitInto == 0 { + panic("splitInto == 0 ") + } + ctx, cancel := context.WithCancel(ctx) + go func() { + defer cancel() + ticker := time.NewTicker(time.Second) + + checkloop: + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + } + // Time to check if we should terminate. + c.opsMu.Lock() + // copies + ops := c.ops.FilterByOp(op) + c.opsMu.Unlock() + start, end := ops.ActiveTimeRange(true) + if end.Sub(start) <= minDur*time.Duration(splitInto)/time.Duration(wantSamples) { + // We don't have enough. + continue + } + segs := ops.Segment(SegmentOptions{ + From: start, + PerSegDuration: end.Sub(start) / time.Duration(splitInto), + AllThreads: true, + }) + if len(segs) < wantSamples { + continue + } + // Use last segment as our base. + mb, _, objs := segs[len(segs)-1].SpeedPerSec() + // Only use the segments we are interested in. + segs = segs[len(segs)-wantSamples : len(segs)-1] + for _, seg := range segs { + segMB, _, segObjs := seg.SpeedPerSec() + if mb > 0 { + if math.Abs(mb-segMB) > threshold*mb { + continue checkloop + } + continue + } + if math.Abs(objs-segObjs) > threshold*objs { + continue checkloop + } + } + // All checks passed. + if mb > 0 { + console.Eraseline() + console.Printf("\rThroughput %0.01fMiB/s within %f%% for %v. Assuming stability. Terminating benchmark.\n", + mb, threshold*100, + segs[0].Duration().Round(time.Millisecond)*time.Duration(len(segs)+1)) + } else { + console.Eraseline() + console.Printf("\rThroughput %0.01f objects/s within %f%% for %v. Assuming stability. Terminating benchmark.\n", + objs, threshold*100, + segs[0].Duration().Round(time.Millisecond)*time.Duration(len(segs)+1)) + } + return + } + }() + return ctx +} + +func (c *Collector) Receiver() chan<- Operation { + return c.rcv +} + +func (c *Collector) Close() Operations { + close(c.rcv) + c.rcvWg.Wait() + return c.ops +} diff --git a/pkg/bench/compare.go b/pkg/bench/compare.go index a96e5c00..7bd7edc6 100644 --- a/pkg/bench/compare.go +++ b/pkg/bench/compare.go @@ -26,15 +26,14 @@ import ( // Comparison is a comparison between two benchmarks. type Comparison struct { - Op string - TTFB *TTFBCmp - Reqs CmpReqs + Op string Average CmpSegment Fastest CmpSegment Median CmpSegment Slowest CmpSegment + Reqs CmpReqs } // CmpSegment is s comparisons between two segments. diff --git a/pkg/bench/delete.go b/pkg/bench/delete.go index 5106ff1f..fcdc6a00 100644 --- a/pkg/bench/delete.go +++ b/pkg/bench/delete.go @@ -32,12 +32,11 @@ import ( // Delete benchmarks delete speed. type Delete struct { + Common + objects generator.Objects + CreateObjects int BatchSize int - Collector *Collector - objects generator.Objects - - Common } // Prepare will create an empty bucket or delete any content already there @@ -51,7 +50,7 @@ func (d *Delete) Prepare(ctx context.Context) error { console.Info("\rUploading ", d.CreateObjects, " objects of ", src.String()) var wg sync.WaitGroup wg.Add(d.Concurrency) - d.Collector = NewCollector() + d.addCollector() obj := make(chan struct{}, d.CreateObjects) for i := 0; i < d.CreateObjects; i++ { obj <- struct{}{} @@ -83,6 +82,7 @@ func (d *Delete) Prepare(ctx context.Context) error { ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + opts.ContentType = obj.ContentType op.Start = time.Now() res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts) @@ -186,6 +186,10 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err ObjPerOp: len(objs), Endpoint: client.EndpointURL().String(), } + if d.DiscardOutput { + op.File = "" + } + op.Start = time.Now() // RemoveObjectsWithContext will split any batches > 1000 into separate requests. errCh := client.RemoveObjects(nonTerm, d.Bucket, objects, minio.RemoveObjectsOptions{}) diff --git a/pkg/bench/get.go b/pkg/bench/get.go index 79e7fe41..23eb9bae 100644 --- a/pkg/bench/get.go +++ b/pkg/bench/get.go @@ -33,18 +33,18 @@ import ( // Get benchmarks download speed. type Get struct { - CreateObjects int - RandomRanges bool - Collector *Collector + Common + + // Default Get options. + GetOpts minio.GetObjectOptions + ListPrefix string + objects generator.Objects + CreateObjects int Versions int + RandomRanges bool ListExisting bool ListFlat bool - ListPrefix string - - // Default Get options. - GetOpts minio.GetObjectOptions - Common } // Prepare will create an empty bucket or delete any content already there @@ -141,7 +141,8 @@ func (g *Get) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) - g.Collector = NewCollector() + g.addCollector() + obj := make(chan struct{}, g.CreateObjects) for i := 0; i < g.CreateObjects; i++ { obj <- struct{}{} @@ -179,6 +180,7 @@ func (g *Get) Prepare(ctx context.Context) error { ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + opts.ContentType = obj.ContentType op.Start = time.Now() res, err := client.PutObject(ctx, g.Bucket, obj.Name, obj.Reader, obj.Size, opts) @@ -276,6 +278,10 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error) ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + if g.DiscardOutput { + op.File = "" + } + if g.RandomRanges && op.Size > 2 { // Randomize length similar to --obj.randsize size := generator.GetExpRandSize(rng, 0, op.Size-2) diff --git a/pkg/bench/list.go b/pkg/bench/list.go index 82446eb7..8806439c 100644 --- a/pkg/bench/list.go +++ b/pkg/bench/list.go @@ -33,14 +33,14 @@ import ( // List benchmarks listing speed. type List struct { + Common + Collector *Collector + objects []generator.Objects + CreateObjects int + Versions int NoPrefix bool - Collector *Collector Metadata bool - Versions int - objects []generator.Objects - - Common } // Prepare will create an empty bucket or delete any content already there @@ -74,7 +74,7 @@ func (d *List) Prepare(ctx context.Context) error { } var wg sync.WaitGroup wg.Add(d.Concurrency) - d.Collector = NewCollector() + d.addCollector() d.objects = make([]generator.Objects, d.Concurrency) var mu sync.Mutex objsCreated := 0 @@ -118,6 +118,7 @@ func (d *List) Prepare(ctx context.Context) error { ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + opts.ContentType = obj.ContentType op.Start = time.Now() res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts) @@ -206,6 +207,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error Size: 0, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() // List all objects with prefix diff --git a/pkg/bench/mixed.go b/pkg/bench/mixed.go index 7a27eeb9..af84feeb 100644 --- a/pkg/bench/mixed.go +++ b/pkg/bench/mixed.go @@ -34,13 +34,13 @@ import ( // Mixed benchmarks mixed operations all inclusive. type Mixed struct { - CreateObjects int - Collector *Collector - Dist *MixedDistribution - - GetOpts minio.GetObjectOptions - StatOpts minio.StatObjectOptions Common + Collector *Collector + Dist *MixedDistribution + + GetOpts minio.GetObjectOptions + StatOpts minio.StatObjectOptions + CreateObjects int } // MixedDistribution keeps track of operation distribution @@ -48,10 +48,11 @@ type Mixed struct { type MixedDistribution struct { // Operation -> distribution. Distribution map[string]float64 - ops []string objects map[string]generator.Object rng *rand.Rand + ops []string + current int mu sync.Mutex } @@ -161,7 +162,7 @@ func (g *Mixed) Prepare(ctx context.Context) error { console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String()) var wg sync.WaitGroup wg.Add(g.Concurrency) - g.Collector = NewCollector() + g.addCollector() obj := make(chan struct{}, g.CreateObjects) for i := 0; i < g.CreateObjects; i++ { obj <- struct{}{} @@ -261,6 +262,7 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() var err error getOpts.VersionID = obj.VersionID @@ -335,6 +337,7 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() err := client.RemoveObject(nonTerm, g.Bucket, obj.Name, minio.RemoveObjectOptions{VersionID: obj.VersionID}) op.End = time.Now() diff --git a/pkg/bench/multipart.go b/pkg/bench/multipart.go index da018da0..42b1338f 100644 --- a/pkg/bench/multipart.go +++ b/pkg/bench/multipart.go @@ -35,16 +35,16 @@ import ( // Multipart benchmarks multipart upload+download speed. type Multipart struct { - CreateParts int - PartStart int - ObjName string - Collector *Collector - objects generator.Objects - UploadID string + Common // Default Get options. - GetOpts minio.GetObjectOptions - Common + GetOpts minio.GetObjectOptions + ObjName string + UploadID string + + objects generator.Objects + CreateParts int + PartStart int } // InitOnce will be run once @@ -79,7 +79,7 @@ func (g *Multipart) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) - g.Collector = NewCollector() + g.addCollector() obj := make(chan int, g.CreateParts) for i := 0; i < g.CreateParts; i++ { obj <- i + g.PartStart @@ -118,6 +118,10 @@ func (g *Multipart) Prepare(ctx context.Context) error { ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + if g.DiscardOutput { + op.File = "" + } + opts.ContentType = obj.ContentType mpopts := minio.PutObjectPartOptions{ SSE: g.Common.PutOpts.ServerSideEncryption, @@ -222,6 +226,7 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations, ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() opts.PartNumber = part o, err := client.GetObject(nonTerm, g.Bucket, obj.Name, opts) diff --git a/pkg/bench/ops.go b/pkg/bench/ops.go index d2ee155c..4c5f9114 100644 --- a/pkg/bench/ops.go +++ b/pkg/bench/ops.go @@ -19,7 +19,6 @@ package bench import ( "bufio" - "context" "encoding/csv" "fmt" "io" @@ -27,7 +26,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/dustin/go-humanize" @@ -37,129 +35,17 @@ import ( type Operations []Operation type Operation struct { - OpType string `json:"type"` - ObjPerOp int `json:"ops"` Start time.Time `json:"start"` - FirstByte *time.Time `json:"first_byte"` End time.Time `json:"end"` + FirstByte *time.Time `json:"first_byte"` + OpType string `json:"type"` Err string `json:"err"` - Size int64 `json:"size"` - File string `json:"file"` - Thread uint16 `json:"thread"` + File string `json:"file,omitempty"` ClientID string `json:"client_id"` Endpoint string `json:"endpoint"` -} - -type Collector struct { - ops Operations - // The mutex protects the ops above. - // Once ops have been added, they should no longer be modified. - opsMu sync.Mutex - rcv chan Operation - rcvWg sync.WaitGroup -} - -func NewCollector() *Collector { - r := &Collector{ - ops: make(Operations, 0, 10000), - rcv: make(chan Operation, 1000), - } - r.rcvWg.Add(1) - go func() { - defer r.rcvWg.Done() - for op := range r.rcv { - r.opsMu.Lock() - r.ops = append(r.ops, op) - r.opsMu.Unlock() - } - }() - return r -} - -// AutoTerm will check if throughput is within 'threshold' (0 -> ) for wantSamples, -// when the current operations are split into 'splitInto' segments. -// The minimum duration for the calculation can be set as well. -// Segment splitting may cause less than this duration to be used. -func (c *Collector) AutoTerm(ctx context.Context, op string, threshold float64, wantSamples, splitInto int, minDur time.Duration) context.Context { - if wantSamples >= splitInto { - panic("wantSamples >= splitInto") - } - if splitInto == 0 { - panic("splitInto == 0 ") - } - ctx, cancel := context.WithCancel(ctx) - go func() { - defer cancel() - ticker := time.NewTicker(time.Second) - - checkloop: - for { - select { - case <-ctx.Done(): - ticker.Stop() - return - case <-ticker.C: - } - // Time to check if we should terminate. - c.opsMu.Lock() - // copies - ops := c.ops.FilterByOp(op) - c.opsMu.Unlock() - start, end := ops.ActiveTimeRange(true) - if end.Sub(start) <= minDur*time.Duration(splitInto)/time.Duration(wantSamples) { - // We don't have enough. - continue - } - segs := ops.Segment(SegmentOptions{ - From: start, - PerSegDuration: end.Sub(start) / time.Duration(splitInto), - AllThreads: true, - }) - if len(segs) < wantSamples { - continue - } - // Use last segment as our base. - mb, _, objs := segs[len(segs)-1].SpeedPerSec() - // Only use the segments we are interested in. - segs = segs[len(segs)-wantSamples : len(segs)-1] - for _, seg := range segs { - segMB, _, segObjs := seg.SpeedPerSec() - if mb > 0 { - if math.Abs(mb-segMB) > threshold*mb { - continue checkloop - } - continue - } - if math.Abs(objs-segObjs) > threshold*objs { - continue checkloop - } - } - // All checks passed. - if mb > 0 { - console.Eraseline() - console.Printf("\rThroughput %0.01fMiB/s within %f%% for %v. Assuming stability. Terminating benchmark.\n", - mb, threshold*100, - segs[0].Duration().Round(time.Millisecond)*time.Duration(len(segs)+1)) - } else { - console.Eraseline() - console.Printf("\rThroughput %0.01f objects/s within %f%% for %v. Assuming stability. Terminating benchmark.\n", - objs, threshold*100, - segs[0].Duration().Round(time.Millisecond)*time.Duration(len(segs)+1)) - } - return - } - }() - return ctx -} - -func (c *Collector) Receiver() chan<- Operation { - return c.rcv -} - -func (c *Collector) Close() Operations { - close(c.rcv) - c.rcvWg.Wait() - return c.ops + ObjPerOp int `json:"ops"` + Size int64 `json:"size"` + Thread uint16 `json:"thread"` } // Duration returns the duration o.End-o.Start @@ -649,11 +535,11 @@ func (o Operations) StdDev() time.Duration { // SizeSegment is a size segment. type SizeSegment struct { + Ops Operations Smallest int64 SmallestLog10 int Biggest int64 BiggestLog10 int - Ops Operations } // SizeString returns the size as a string. @@ -1038,6 +924,7 @@ func (o Operations) CSV(w io.Writer, comment string) error { if err != nil { return err } + for i, op := range o { var ttfb string if op.FirstByte != nil { diff --git a/pkg/bench/put.go b/pkg/bench/put.go index d9ffa2c3..7f9cee01 100644 --- a/pkg/bench/put.go +++ b/pkg/bench/put.go @@ -41,7 +41,8 @@ func (u *Put) Prepare(ctx context.Context) error { func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error) { var wg sync.WaitGroup wg.Add(u.Concurrency) - c := NewCollector() + u.addCollector() + c := u.Collector if u.AutoTermDur > 0 { ctx = c.AutoTerm(ctx, http.MethodPut, u.AutoTermScale, autoTermCheck, autoTermSamples, u.AutoTermDur) } @@ -73,10 +74,11 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error) OpType: http.MethodPut, Thread: uint16(i), Size: obj.Size, - File: obj.Name, ObjPerOp: 1, + File: obj.Name, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() res, err := client.PutObject(nonTerm, u.Bucket, obj.Name, obj.Reader, obj.Size, opts) op.End = time.Now() @@ -105,7 +107,7 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error) // Cleanup deletes everything uploaded to the bucket. func (u *Put) Cleanup(ctx context.Context) { - var pf []string + pf := make([]string, 0, len(u.prefixes)) for p := range u.prefixes { pf = append(pf, p) } diff --git a/pkg/bench/retention.go b/pkg/bench/retention.go index 4658b203..1e25b3e2 100644 --- a/pkg/bench/retention.go +++ b/pkg/bench/retention.go @@ -32,12 +32,11 @@ import ( // Retention benchmarks download speed. type Retention struct { + Common + objects generator.Objects + CreateObjects int Versions int - Collector *Collector - objects generator.Objects - - Common } // Prepare will create an empty bucket or delete any content already there @@ -61,7 +60,7 @@ func (g *Retention) Prepare(ctx context.Context) error { console.Info("\rUploading ", g.CreateObjects, " objects with ", g.Versions, " versions each of ", src.String()) var wg sync.WaitGroup wg.Add(g.Concurrency) - g.Collector = NewCollector() + g.addCollector() obj := make(chan struct{}, g.CreateObjects) for i := 0; i < g.CreateObjects; i++ { obj <- struct{}{} @@ -99,6 +98,7 @@ func (g *Retention) Prepare(ctx context.Context) error { ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + opts.ContentType = obj.ContentType op.Start = time.Now() res, err := client.PutObject(ctx, g.Bucket, obj.Name, obj.Reader, obj.Size, opts) diff --git a/pkg/bench/s3zip.go b/pkg/bench/s3zip.go index b829e18d..8e23816b 100644 --- a/pkg/bench/s3zip.go +++ b/pkg/bench/s3zip.go @@ -35,12 +35,11 @@ import ( // S3Zip benchmarks download from a zip file. type S3Zip struct { - CreateFiles int - ZipObjName string - Collector *Collector - objects generator.Objects - Common + ZipObjName string + objects generator.Objects + + CreateFiles int } // Prepare will create an empty bucket or delete any content already there @@ -50,7 +49,7 @@ func (g *S3Zip) Prepare(ctx context.Context) error { return err } - g.Collector = NewCollector() + g.addCollector() src := g.Source() console.Eraseline() console.Info("\rUploading", g.ZipObjName, "with ", g.CreateFiles, " files each of ", src.String()) diff --git a/pkg/bench/select.go b/pkg/bench/select.go index c785e4fa..28bed1dd 100644 --- a/pkg/bench/select.go +++ b/pkg/bench/select.go @@ -33,13 +33,14 @@ import ( // Select benchmarks download speed. type Select struct { - CreateObjects int - Collector *Collector - objects generator.Objects + Common + Collector *Collector // Default Select options. SelectOpts minio.SelectObjectOptions - Common + objects generator.Objects + + CreateObjects int } // Prepare will create an empty bucket or delete any content already there @@ -53,7 +54,7 @@ func (g *Select) Prepare(ctx context.Context) error { console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String()) var wg sync.WaitGroup wg.Add(g.Concurrency) - g.Collector = NewCollector() + g.addCollector() obj := make(chan struct{}, g.CreateObjects) for i := 0; i < g.CreateObjects; i++ { obj <- struct{}{} @@ -85,6 +86,7 @@ func (g *Select) Prepare(ctx context.Context) error { ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + opts.ContentType = obj.ContentType op.Start = time.Now() res, err := client.PutObject(ctx, g.Bucket, obj.Name, obj.Reader, obj.Size, opts) @@ -163,6 +165,7 @@ func (g *Select) Start(ctx context.Context, wait chan struct{}) (Operations, err ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() var err error o, err := client.SelectObjectContent(nonTerm, g.Bucket, obj.Name, opts) diff --git a/pkg/bench/snowball.go b/pkg/bench/snowball.go index 3022c2ce..7c826b2a 100644 --- a/pkg/bench/snowball.go +++ b/pkg/bench/snowball.go @@ -34,15 +34,14 @@ import ( // Snowball benchmarks snowball upload speed. type Snowball struct { - NumObjs int // Number objects in each snowball. + Common + prefixes map[string]struct{} + + enc []*zstd.Encoder + NumObjs int // Number objects in each snowball. + WindowSize int Duplicate bool // Duplicate object content. Compress bool // Zstandard compress snowball. - Collector *Collector - WindowSize int - prefixes map[string]struct{} - - enc []*zstd.Encoder - Common } // Prepare will create an empty bucket or delete any content already there @@ -58,11 +57,9 @@ func (s *Snowball) Prepare(ctx context.Context) error { } } } + s.addCollector() s.prefixes = make(map[string]struct{}, s.Concurrency) - if err := s.createEmptyBucket(ctx); err != nil { - return err - } - return nil + return s.createEmptyBucket(ctx) } // Start will execute the main benchmark. @@ -70,7 +67,7 @@ func (s *Snowball) Prepare(ctx context.Context) error { func (s *Snowball) Start(ctx context.Context, wait chan struct{}) (Operations, error) { var wg sync.WaitGroup wg.Add(s.Concurrency) - c := NewCollector() + c := s.Collector if s.AutoTermDur > 0 { ctx = c.AutoTerm(ctx, http.MethodPut, s.AutoTermScale, autoTermCheck, autoTermSamples, s.AutoTermDur) } @@ -110,6 +107,7 @@ func (s *Snowball) Start(ctx context.Context, wait chan struct{}) (Operations, e File: path.Join(obj.Prefix, "snowball.tar"), ObjPerOp: s.NumObjs, } + { tw := tar.NewWriter(w) content, err := io.ReadAll(obj.Reader) @@ -193,7 +191,7 @@ func (s *Snowball) Cleanup(ctx context.Context) { s.enc[i] = nil } } - var pf []string + pf := make([]string, 0, len(s.prefixes)) for p := range s.prefixes { pf = append(pf, p) } diff --git a/pkg/bench/stat.go b/pkg/bench/stat.go index 342c90fe..d17543cb 100644 --- a/pkg/bench/stat.go +++ b/pkg/bench/stat.go @@ -32,14 +32,13 @@ import ( // Stat benchmarks HEAD speed. type Stat struct { - CreateObjects int - Collector *Collector - objects generator.Objects - Versions int + Common // Default Stat options. - StatOpts minio.StatObjectOptions - Common + StatOpts minio.StatObjectOptions + objects generator.Objects + CreateObjects int + Versions int } // Prepare will create an empty bucket or delete any content already there @@ -68,7 +67,7 @@ func (g *Stat) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) - g.Collector = NewCollector() + g.addCollector() obj := make(chan struct{}, g.CreateObjects) for i := 0; i < g.CreateObjects; i++ { obj <- struct{}{} @@ -106,6 +105,7 @@ func (g *Stat) Prepare(ctx context.Context) error { ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + opts.ContentType = obj.ContentType op.Start = time.Now() res, err := client.PutObject(ctx, g.Bucket, obj.Name, obj.Reader, obj.Size, opts) @@ -183,6 +183,7 @@ func (g *Stat) Start(ctx context.Context, wait chan struct{}) (Operations, error ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() var err error if g.Versions > 1 { diff --git a/pkg/bench/versioned.go b/pkg/bench/versioned.go index 707af2a8..48153c9e 100644 --- a/pkg/bench/versioned.go +++ b/pkg/bench/versioned.go @@ -35,13 +35,12 @@ import ( // Versioned benchmarks mixed operations all inclusive. type Versioned struct { - CreateObjects int - Collector *Collector - Dist *VersionedDistribution - - GetOpts minio.GetObjectOptions - StatOpts minio.StatObjectOptions Common + Dist *VersionedDistribution + + GetOpts minio.GetObjectOptions + StatOpts minio.StatObjectOptions + CreateObjects int } // Prepare will create an empty bucket or delete any content already there @@ -67,7 +66,7 @@ func (g *Versioned) Prepare(ctx context.Context) error { console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String()) var wg sync.WaitGroup wg.Add(g.Concurrency) - g.Collector = NewCollector() + g.addCollector() obj := make(chan struct{}, g.CreateObjects) for i := 0; i < g.CreateObjects; i++ { obj <- struct{}{} @@ -166,6 +165,7 @@ func (g *Versioned) Start(ctx context.Context, wait chan struct{}) (Operations, ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() var err error getOpts.VersionID = obj.VersionID @@ -205,6 +205,7 @@ func (g *Versioned) Start(ctx context.Context, wait chan struct{}) (Operations, ObjPerOp: 1, Endpoint: client.EndpointURL().String(), } + op.Start = time.Now() res, err := client.PutObject(nonTerm, g.Bucket, obj.Name, obj.Reader, obj.Size, putOpts) op.End = time.Now() @@ -300,10 +301,11 @@ type versionedObj struct { type VersionedDistribution struct { // Operation -> distribution. Distribution map[string]float64 - ops []string objects map[string]versionedObj rng *rand.Rand + ops []string + current int mu sync.Mutex } diff --git a/pkg/generator/csv.go b/pkg/generator/csv.go index 52d1ea36..c6134456 100644 --- a/pkg/generator/csv.go +++ b/pkg/generator/csv.go @@ -83,10 +83,10 @@ func (o CsvOpts) RngSeed(s int64) CsvOpts { // CsvOpts provides options for CSV generation. type CsvOpts struct { err error - cols, rows int - comma byte seed *int64 + cols, rows int minLen, maxLen int + comma byte } func csvOptsDefaults() CsvOpts { @@ -102,13 +102,14 @@ func csvOptsDefaults() CsvOpts { } type csvSource struct { - o Options - buf *circularBuffer - builder []byte - obj Object + buf *circularBuffer // We may need a faster RNG for this... rng *rand.Rand + obj Object + + o Options + builder []byte } func newCsv(o Options) (Source, error) { diff --git a/pkg/generator/generator.go b/pkg/generator/generator.go index b2e6564f..898d532d 100644 --- a/pkg/generator/generator.go +++ b/pkg/generator/generator.go @@ -54,12 +54,12 @@ type Object struct { // Corresponding mime type ContentType string - // Size of the object to expect. - Size int64 - Prefix string VersionID string + + // Size of the object to expect. + Size int64 } // Objects is a slice of objects. diff --git a/pkg/generator/options.go b/pkg/generator/options.go index c507739b..749c9822 100644 --- a/pkg/generator/options.go +++ b/pkg/generator/options.go @@ -26,13 +26,13 @@ import ( // Use WithXXX functions to set them. type Options struct { src func(o Options) (Source, error) - minSize int64 - totalSize int64 - randSize bool customPrefix string - csv CsvOpts random RandomOpts + csv CsvOpts + minSize int64 + totalSize int64 randomPrefix int + randSize bool } // OptionApplier allows to abstract generator options. diff --git a/pkg/generator/random.go b/pkg/generator/random.go index 3f28b0a1..5a20dc40 100644 --- a/pkg/generator/random.go +++ b/pkg/generator/random.go @@ -76,11 +76,11 @@ func randomOptsDefaults() RandomOpts { } type randomSrc struct { - counter uint64 - o Options buf *scrambler rng *rand.Rand obj Object + o Options + counter uint64 } func newRandom(o Options) (Source, error) { diff --git a/pkg/generator/scambler.go b/pkg/generator/scambler.go index 621cf61f..2d8881fc 100644 --- a/pkg/generator/scambler.go +++ b/pkg/generator/scambler.go @@ -29,12 +29,12 @@ import ( ) type scrambler struct { + // Data source + stream *sio.EncReader // The total number of bytes to return want int64 // Number of bytes read read int64 - // Data source - stream *sio.EncReader } // Reset will reset the scrambler. diff --git a/staticcheck.conf b/staticcheck.conf deleted file mode 100644 index fec5f6f3..00000000 --- a/staticcheck.conf +++ /dev/null @@ -1 +0,0 @@ -checks = ["all", "-ST1005", "-ST1000", "-SA4000", "-SA9004", "-SA1019", "-SA1008", "-U1000", "-ST1016"]