Skip to content

Commit

Permalink
object/gs: use clients pool to optimize gs performance (#5020)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro authored Jul 25, 2024
1 parent 26a877f commit b7d6278
Showing 1 changed file with 48 additions and 14 deletions.
62 changes: 48 additions & 14 deletions pkg/object/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"net/url"
"os"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"cloud.google.com/go/compute/metadata"
Expand All @@ -38,7 +40,8 @@ import (

type gs struct {
DefaultObjectStorage
client *storage.Client
clients []*storage.Client
index uint64
bucket string
region string
pageToken string
Expand All @@ -49,6 +52,14 @@ func (g *gs) String() string {
return fmt.Sprintf("gs://%s/", g.bucket)
}

func (g *gs) getClient() *storage.Client {
if len(g.clients) == 1 {
return g.clients[0]
}
n := atomic.AddUint64(&g.index, 1)
return g.clients[n%(uint64(len(g.clients)))]
}

func (g *gs) Create() error {
// check if the bucket is already exists
if objs, err := g.List("", "", "", 1, true); err == nil && len(objs) > 0 {
Expand Down Expand Up @@ -76,7 +87,7 @@ func (g *gs) Create() error {
}
}

err := g.client.Bucket(g.bucket).Create(ctx, projectID, &storage.BucketAttrs{
err := g.getClient().Bucket(g.bucket).Create(ctx, projectID, &storage.BucketAttrs{
Name: g.bucket,
StorageClass: g.sc,
Location: g.region,
Expand All @@ -88,7 +99,7 @@ func (g *gs) Create() error {
}

func (g *gs) Head(key string) (Object, error) {
attrs, err := g.client.Bucket(g.bucket).Object(key).Attrs(ctx)
attrs, err := g.getClient().Bucket(g.bucket).Object(key).Attrs(ctx)
if err != nil {
if err == storage.ErrObjectNotExist {
err = os.ErrNotExist
Expand All @@ -106,7 +117,7 @@ func (g *gs) Head(key string) (Object, error) {
}

func (g *gs) Get(key string, off, limit int64, getters ...AttrGetter) (io.ReadCloser, error) {
reader, err := g.client.Bucket(g.bucket).Object(key).NewRangeReader(ctx, off, limit)
reader, err := g.getClient().Bucket(g.bucket).Object(key).NewRangeReader(ctx, off, limit)
if err != nil {
return nil, err
}
Expand All @@ -117,9 +128,17 @@ func (g *gs) Get(key string, off, limit int64, getters ...AttrGetter) (io.ReadCl
}

func (g *gs) Put(key string, data io.Reader, getters ...AttrGetter) error {
writer := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx)
writer := g.getClient().Bucket(g.bucket).Object(key).NewWriter(ctx)
writer.StorageClass = g.sc
_, err := io.Copy(writer, data)

// If you upload small objects (< 16MiB), you should set ChunkSize
// to a value slightly larger than the objects' sizes to avoid memory bloat.
// This is especially important if you are uploading many small objects concurrently.
writer.ChunkSize = 5 << 20

buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
_, err := io.CopyBuffer(writer, data, *buf)
if err != nil {
return err
}
Expand All @@ -129,8 +148,9 @@ func (g *gs) Put(key string, data io.Reader, getters ...AttrGetter) error {
}

func (g *gs) Copy(dst, src string) error {
srcObj := g.client.Bucket(g.bucket).Object(src)
dstObj := g.client.Bucket(g.bucket).Object(dst)
client := g.getClient()
srcObj := client.Bucket(g.bucket).Object(src)
dstObj := client.Bucket(g.bucket).Object(dst)
copier := dstObj.CopierFrom(srcObj)
if g.sc != "" {
copier.StorageClass = g.sc
Expand All @@ -140,7 +160,7 @@ func (g *gs) Copy(dst, src string) error {
}

func (g *gs) Delete(key string, getters ...AttrGetter) error {
if err := g.client.Bucket(g.bucket).Object(key).Delete(ctx); err != storage.ErrObjectNotExist {
if err := g.getClient().Bucket(g.bucket).Object(key).Delete(ctx); err != storage.ErrObjectNotExist {
return err
}
return nil
Expand All @@ -151,7 +171,7 @@ func (g *gs) List(prefix, marker, delimiter string, limit int64, followLink bool
// last page
return nil, nil
}
objectIterator := g.client.Bucket(g.bucket).Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: delimiter})
objectIterator := g.getClient().Bucket(g.bucket).Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: delimiter})
pager := iterator.NewPager(objectIterator, int(limit), g.pageToken)
var entries []*storage.ObjectAttrs
nextPageToken, err := pager.NextPage(&entries)
Expand Down Expand Up @@ -195,11 +215,25 @@ func newGS(endpoint, accessKey, secretKey, token string) (ObjectStorage, error)
region = hostParts[1]
}

client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
var size int
if ssize := os.Getenv("JFS_NUM_GOOGLE_CLIENTS"); ssize != "" {
if size, err = strconv.Atoi(ssize); err != nil {
return nil, err
}
}
if size < 1 {
size = 5
}
return &gs{client: client, bucket: bucket, region: region}, nil
clis := make([]*storage.Client, size)
for i := 0; i < size; i++ {
client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
clis[i] = client
}

return &gs{clients: clis, bucket: bucket, region: region}, nil
}

func init() {
Expand Down

0 comments on commit b7d6278

Please sign in to comment.