Skip to content

Commit

Permalink
object: list objects v2 (#5250)
Browse files Browse the repository at this point in the history
Co-authored-by: zhijian <[email protected]>
  • Loading branch information
davies and zhijian-pro authored Nov 15, 2024
1 parent 69008e6 commit d7e09db
Show file tree
Hide file tree
Showing 41 changed files with 434 additions and 346 deletions.
2 changes: 1 addition & 1 deletion cmd/objbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func functionalTesting(blob object.ObjectStorage, result *[][]string, colorful b
if err := blob.Put(key, bytes.NewReader([]byte("1"))); err != nil {
return fmt.Errorf("put encode file failed: %s", err)
} else {
if resp, err := blob.List("", "测试编码文件", "", 1, true); err != nil && err != utils.ENOTSUP {
if resp, _, _, err := blob.List("", "测试编码文件", "", "", 1, true); err != nil && err != utils.ENOTSUP {
return fmt.Errorf("list encode file failed %s", err)
} else if len(resp) == 1 && resp[0].Key() != key {
return fmt.Errorf("list encode file failed: expect key %s, but got %s", key, resp[0].Key())
Expand Down
18 changes: 11 additions & 7 deletions cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ func (j *juiceFS) Head(key string) (object.Object, error) {
return &jObj{key, fi}, nil
}

func (j *juiceFS) List(prefix, marker, delimiter string, limit int64, followLink bool) ([]object.Object, error) {
func (j *juiceFS) List(prefix, marker, token, delimiter string, limit int64, followLink bool) ([]object.Object, bool, string, error) {
if delimiter != "/" {
return nil, utils.ENOTSUP
return nil, false, "", utils.ENOTSUP
}
dir := j.path(prefix)
var objs []object.Object
Expand All @@ -229,18 +229,18 @@ func (j *juiceFS) List(prefix, marker, delimiter string, limit int64, followLink
obj, err := j.Head(prefix)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
return nil, false, "", nil
}
return nil, err
return nil, false, "", err
}
objs = append(objs, obj)
}
entries, err := j.readDirSorted(dir, followLink)
if err != 0 {
if err == syscall.ENOENT {
return nil, nil
return nil, false, "", nil
}
return nil, err
return nil, false, "", err
}
for _, e := range entries {
key := dir[1:] + e.name
Expand All @@ -253,7 +253,11 @@ func (j *juiceFS) List(prefix, marker, delimiter string, limit int64, followLink
break
}
}
return objs, nil
var nextMarker string
if len(objs) > 0 {
nextMarker = objs[len(objs)-1].Key()
}
return objs, len(objs) == int(limit), nextMarker, nil
}

type mEntry struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/vbauerster/mpb/v7 v7.0.3
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8
github.com/vmware/go-nfs-client v0.0.0-20190605212624-d43b92724c1b
github.com/volcengine/ve-tos-golang-sdk/v2 v2.5.3
github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.0
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
go.etcd.io/etcd v3.3.27+incompatible
go.etcd.io/etcd/client/v3 v3.5.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,8 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8 h1:EVObHAr8DqpoJCVv6KYTle8FEImKhtkfcZetNqxDoJQ=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
github.com/volcengine/ve-tos-golang-sdk/v2 v2.5.3 h1:sc7EfqfTjMJtPtx8vYUDIL9WmmJtmamMFYxWF467IGw=
github.com/volcengine/ve-tos-golang-sdk/v2 v2.5.3/go.mod h1:IrjK84IJJTuOZOTMv/P18Ydjy/x+ow7fF7q11jAxXLM=
github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.0 h1:MnTrrKb7gvWoI1W5GxVnjjzdSPmms4++JiR3ioqqoRc=
github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.0/go.mod h1:IrjK84IJJTuOZOTMv/P18Ydjy/x+ow7fF7q11jAxXLM=
github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA=
github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
Expand Down
39 changes: 16 additions & 23 deletions pkg/object/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type wasb struct {
azblobCli *azblob.Client
sc string
cName string
marker string
}

func (b *wasb) String() string {
Expand Down Expand Up @@ -136,48 +135,42 @@ func (b *wasb) Delete(key string, getters ...AttrGetter) error {
return err
}

func (b *wasb) List(prefix, marker, delimiter string, limit int64, followLink bool) ([]Object, error) {
func (b *wasb) List(prefix, startAfter, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
if delimiter != "" {
return nil, notSupported
}
// todo
if marker != "" {
if b.marker == "" {
// last page
return nil, nil
}
marker = b.marker
return nil, false, "", notSupported
}

limit32 := int32(limit)

pager := b.azblobCli.NewListBlobsFlatPager(b.cName, &azblob.ListBlobsFlatOptions{Prefix: &prefix, Marker: &marker, MaxResults: &(limit32)})
pager := b.azblobCli.NewListBlobsFlatPager(b.cName, &azblob.ListBlobsFlatOptions{Prefix: &prefix, Marker: &token, MaxResults: &limit32})
page, err := pager.NextPage(ctx)
if err != nil {
return nil, err
}
if pager.More() {
b.marker = *page.NextMarker
} else {
b.marker = ""
return nil, false, "", err
}
var n int
if page.Segment != nil {
n = len(page.Segment.BlobItems)
}
objs := make([]Object, n)
objs := make([]Object, 0, n)
for i := 0; i < n; i++ {
blob := page.Segment.BlobItems[i]
if *blob.Name <= startAfter {
continue
}
mtime := blob.Properties.LastModified
objs[i] = &obj{
objs = append(objs, &obj{
*blob.Name,
*blob.Properties.ContentLength,
*mtime,
strings.HasSuffix(*blob.Name, "/"),
string(*blob.Properties.AccessTier),
}
})
}

var nextMarker string
if pager.More() {
nextMarker = *page.NextMarker
}
return objs, nil
return objs, pager.More(), nextMarker, nil
}

func (b *wasb) SetStorageClass(sc string) error {
Expand Down
26 changes: 12 additions & 14 deletions pkg/object/b2.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import (

type b2client struct {
DefaultObjectStorage
bucket *backblaze.Bucket
nextMarker string
bucket *backblaze.Bucket
}

func (c *b2client) String() string {
Expand Down Expand Up @@ -122,33 +121,32 @@ func (c *b2client) Delete(key string, getters ...AttrGetter) error {
return err
}

func (c *b2client) List(prefix, marker, delimiter string, limit int64, followLink bool) ([]Object, error) {
func (c *b2client) List(prefix, startAfter, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
if limit > 1000 {
limit = 1000
}
if marker == "" && c.nextMarker != "" {
marker = c.nextMarker
c.nextMarker = ""
}
resp, err := c.bucket.ListFileNamesWithPrefix(marker, int(limit), prefix, delimiter)

resp, err := c.bucket.ListFileNamesWithPrefix(token, int(limit), prefix, delimiter)
if err != nil {
return nil, err
return nil, false, "", err
}

n := len(resp.Files)
objs := make([]Object, n)
objs := make([]Object, 0, n)
for i := 0; i < n; i++ {
if resp.Files[i].Name <= startAfter {
continue
}
f := resp.Files[i]
objs[i] = &obj{
objs = append(objs, &obj{
f.Name,
f.ContentLength,
time.Unix(f.UploadTimestamp/1000, 0),
strings.HasSuffix(f.Name, "/"),
"",
}
})
}
c.nextMarker = resp.NextFileName
return objs, nil
return objs, resp.NextFileName != "", resp.NextFileName, nil
}

// TODO: support multipart upload using S3 client
Expand Down
8 changes: 4 additions & 4 deletions pkg/object/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ func (q *bosclient) Delete(key string, getters ...AttrGetter) error {
return err
}

func (q *bosclient) List(prefix, marker, delimiter string, limit int64, followLink bool) ([]Object, error) {
func (q *bosclient) List(prefix, start, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
if limit > 1000 {
limit = 1000
}
limit_ := int(limit)
out, err := q.c.SimpleListObjects(q.bucket, prefix, limit_, marker, delimiter)
out, err := q.c.SimpleListObjects(q.bucket, prefix, limit_, start, delimiter)
if err != nil {
return nil, err
return nil, false, "", err
}
n := len(out.Contents)
objs := make([]Object, n)
Expand All @@ -167,7 +167,7 @@ func (q *bosclient) List(prefix, marker, delimiter string, limit int64, followLi
}
sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() })
}
return objs, nil
return objs, out.IsTruncated, out.NextMarker, nil
}

func (q *bosclient) CreateMultipartUpload(key string) (*MultipartUpload, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/object/bunny.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (b *bunnyClient) Delete(key string, getters ...AttrGetter) error {
return err
}

func (b *bunnyClient) List(prefix, marker, delimiter string, limit int64, followLink bool) ([]Object, error) {
func (b *bunnyClient) List(prefix, marker, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
if delimiter != "/" {
return nil, notSupported
return nil, false, "", notSupported
}
var output []Object
var dir = prefix
Expand All @@ -95,7 +95,7 @@ func (b *bunnyClient) List(prefix, marker, delimiter string, limit int64, follow
if os.IsNotExist(err) {
err = nil
}
return nil, err
return nil, false, "", err
}

for _, o := range listedObjects {
Expand All @@ -109,7 +109,7 @@ func (b *bunnyClient) List(prefix, marker, delimiter string, limit int64, follow
}
}

return output, nil
return generateListResult(output, limit)
}

// The Object Path returned by the Bunny API contains the Storage Zone Name, which this function removes
Expand Down
18 changes: 6 additions & 12 deletions pkg/object/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,17 @@ func (c *COS) Delete(key string, getters ...AttrGetter) error {
return err
}

func (c *COS) List(prefix, marker, delimiter string, limit int64, followLink bool) ([]Object, error) {
func (c *COS) List(prefix, start, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
param := cos.BucketGetOptions{
Prefix: prefix,
Marker: marker,
Marker: start,
MaxKeys: int(limit),
Delimiter: delimiter,
EncodingType: "url",
}
resp, _, err := c.c.Bucket.Get(ctx, &param)
for err == nil && len(resp.Contents) == 0 && resp.IsTruncated {
if param.Marker, err = cos.DecodeURIComponent(resp.NextMarker); err != nil {
return nil, errors.WithMessagef(err, "failed to decode nextMarker %s", resp.NextMarker)
}
resp, _, err = c.c.Bucket.Get(ctx, &param)
}
if err != nil {
return nil, err
return nil, false, "", err
}
n := len(resp.Contents)
objs := make([]Object, n)
Expand All @@ -191,21 +185,21 @@ func (c *COS) List(prefix, marker, delimiter string, limit int64, followLink boo
t, _ := time.Parse(time.RFC3339, o.LastModified)
key, err := cos.DecodeURIComponent(o.Key)
if err != nil {
return nil, errors.WithMessagef(err, "failed to decode key %s", o.Key)
return nil, false, "", errors.WithMessagef(err, "failed to decode key %s", o.Key)
}
objs[i] = &obj{key, int64(o.Size), t, strings.HasSuffix(key, "/"), o.StorageClass}
}
if delimiter != "" {
for _, p := range resp.CommonPrefixes {
key, err := cos.DecodeURIComponent(p)
if err != nil {
return nil, errors.WithMessagef(err, "failed to decode commonPrefixes %s", p)
return nil, false, "", errors.WithMessagef(err, "failed to decode commonPrefixes %s", p)
}
objs = append(objs, &obj{key, 0, time.Unix(0, 0), true, ""})
}
sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() })
}
return objs, nil
return objs, resp.IsTruncated, resp.NextMarker, nil
}

func (c *COS) ListAll(prefix, marker string, followLink bool) (<-chan Object, error) {
Expand Down
17 changes: 8 additions & 9 deletions pkg/object/dragonfly.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (d *dragonfly) String() string {

// Create creates the object if it does not exist.
func (d *dragonfly) Create() error {
if _, err := d.List("", "", "", 1, false); err == nil {
if _, _, _, err := d.List("", "", "", "", 1, false); err == nil {
return nil
}

Expand Down Expand Up @@ -438,14 +438,14 @@ func (d *dragonfly) Delete(key string, getters ...AttrGetter) error {
}

// List lists the objects with the given prefix.
func (d *dragonfly) List(prefix, marker, delimiter string, limit int64, followLink bool) ([]Object, error) {
func (d *dragonfly) List(prefix, marker, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
if limit > MaxGetObjectMetadatasLimit {
limit = MaxGetObjectMetadatasLimit
}

u, err := url.Parse(d.endpoint)
if err != nil {
return nil, err
return nil, false, "", err
}

u.Path = path.Join("buckets", d.bucket, "metadatas")
Expand All @@ -469,23 +469,23 @@ func (d *dragonfly) List(prefix, marker, delimiter string, limit int64, followLi
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
return nil, false, "", err
}

// List object.
resp, err := d.client.Do(req)
if err != nil {
return nil, err
return nil, false, "", err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("bad response status %s", resp.Status)
return nil, false, "", fmt.Errorf("bad response status %s", resp.Status)
}

var objectMetadatas ObjectMetadatas
if err := json.NewDecoder(resp.Body).Decode(&objectMetadatas); err != nil {
return nil, err
return nil, false, "", err
}

objs := make([]Object, 0, len(objectMetadatas.Metadatas))
Expand All @@ -505,8 +505,7 @@ func (d *dragonfly) List(prefix, marker, delimiter string, limit int64, followLi
}
sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() })
}

return objs, err
return generateListResult(objs, limit)
}

// newDragonfly creates a new dragonfly object storage.
Expand Down
Loading

0 comments on commit d7e09db

Please sign in to comment.