Skip to content

Commit

Permalink
simplify warp cleanup stage (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavardhana authored Jul 27, 2021
1 parent 76801ae commit b3eac9f
Showing 1 changed file with 31 additions and 51 deletions.
82 changes: 31 additions & 51 deletions pkg/bench/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/minio/minio-go/v7"
Expand Down Expand Up @@ -136,59 +135,40 @@ func (c *Common) deleteAllInBucket(ctx context.Context, prefixes ...string) {
if len(prefixes) == 0 {
prefixes = []string{""}
}
var wg sync.WaitGroup
wg.Add(len(prefixes))
for _, prefix := range prefixes {
go func(prefix string) {
defer wg.Done()

doneCh := make(chan struct{})
defer close(doneCh)
cl, done := c.Client()
defer done()
remove := make(chan minio.ObjectInfo, 1000)
errCh := cl.RemoveObjects(ctx, c.Bucket, remove, minio.RemoveObjectsOptions{})
defer func() {
// Signal we are done
close(remove)
// Wait for deletes to finish
err := <-errCh
if err.Err != nil {
c.Error(err.Err)
}
}()

objects := cl.ListObjects(ctx, c.Bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true, WithVersions: c.Versioned})
for {
select {
case obj, ok := <-objects:
if !ok {
return
}
if obj.Err != nil {
c.Error(obj.Err)
continue
}
sendNext:
for {
select {
case remove <- minio.ObjectInfo{
Key: obj.Key,
VersionID: obj.VersionID,
}:
break sendNext
case err := <-errCh:
c.Error(err)
}
}
case err := <-errCh:
c.Error(err)

doneCh := make(chan struct{})
defer close(doneCh)

cl, done := c.Client()
defer done()

objectsCh := make(chan minio.ObjectInfo)
go func() {
defer close(objectsCh)
opts := minio.ListObjectsOptions{
Recursive: true,
WithVersions: c.Versioned,
}
for _, prefix := range prefixes {
opts.Prefix = prefix
for object := range cl.ListObjects(ctx, c.Bucket, opts) {
if object.Err != nil {
c.Error(object.Err)
return
}
objectsCh <- object
}
}(prefix)
}
wg.Wait()
console.Infof("\rClearing Prefix %/%q...", c.Bucket, prefix)
}
}()

errCh := cl.RemoveObjects(ctx, c.Bucket, objectsCh, minio.RemoveObjectsOptions{})
for err := range errCh {
if err.Err != nil {
c.Error(err.Err)
continue
}
}
}

// prepareProgress updates preparation progess with the value 0->1.
Expand Down

0 comments on commit b3eac9f

Please sign in to comment.