Skip to content

Commit

Permalink
fix azure
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro committed Nov 11, 2024
1 parent de9d11d commit e8c8b77
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 38 deletions.
38 changes: 18 additions & 20 deletions pkg/object/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,57 +144,55 @@ func (b *wasb) List(prefix, marker, delimiter string, limit int64, followLink bo
}
marker = b.marker
}
objs, _, nextMarker, err := b.ListV2(prefix, marker, "", delimiter, limit, followLink)
objs, _, nextMarker, err := b.ListV2(prefix, "", marker, delimiter, limit, followLink)
b.marker = nextMarker
return objs, err
}

func (b *wasb) ListV2(prefix, start, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {

if start != "" {
if b.marker == "" {
// last page
return nil, false, "", nil
}
start = b.marker
}

if delimiter != "" {
return nil, false, "", notSupported
}

limit32 := int32(limit)
pager := b.azblobCli.NewListBlobsFlatPager(b.cName, &azblob.ListBlobsFlatOptions{Prefix: &prefix, Marker: &start, MaxResults: &(limit32)})
pager := b.azblobCli.NewListBlobsFlatPager(b.cName, &azblob.ListBlobsFlatOptions{Prefix: &prefix, Marker: &token, MaxResults: &(limit32)})
page, err := pager.NextPage(ctx)
if err != nil {
return nil, false, "", err
}
if pager.More() {
b.marker = *page.NextMarker
} else {
b.marker = ""
}
var n int
if page.Segment != nil {
n = len(page.Segment.BlobItems)
}
objs := make([]Object, n)
objs := make([]Object, 0, n)
for i := 0; i < n; i++ {
blob := page.Segment.BlobItems[i]
if *blob.Name <= start {
continue
}
mtime := blob.Properties.LastModified
objs[i] = &obj{
objs = append(objs, &obj{
*blob.Name,
*blob.Properties.ContentLength,
*mtime,
strings.HasSuffix(*blob.Name, "/"),
string(*blob.Properties.AccessTier),
}
})
}
var nextMarker string
var hasMore bool

if pager.More() {
nextMarker = *page.NextMarker
hasMore = true
}
var objsContent []Object
for limit-int64(len(objs)) > 0 && hasMore {
objsContent, hasMore, nextMarker, err = b.ListV2(prefix, start, nextMarker, delimiter, limit-int64(len(objs)), followLink)

Check failure on line 191 in pkg/object/azure.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
objs = append(objs, objsContent...)
}
return objs, pager.More(), nextMarker, nil

return objs, hasMore, nextMarker, nil
}

func (b *wasb) SetStorageClass(sc string) error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/object/object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,14 @@ type listThread struct {
err error
entries []Object
nextToken string
hasMore bool
}

func ListAllWithDelimiter(store ObjectStorage, prefix, start, end string, followLink bool) (<-chan Object, error) {

var entries []Object
var err error
entries, _, _, err = ListWrap(store, prefix, start, "", "/", 1e9, followLink)
entries, _, _, err = ListV2(store, prefix, start, "", "/", 1e9, followLink)
if err != nil {
logger.Errorf("list %s: %s", prefix, err)
return nil, err
Expand All @@ -223,7 +224,7 @@ func ListAllWithDelimiter(store ObjectStorage, prefix, start, end string, follow
if !entries[i].IsDir() || key == prefix {
continue
}
t.entries, _, t.nextToken, t.err = ListWrap(store, key, "\x00", t.nextToken, "/", 1e9, followLink) // exclude itself
t.entries, t.hasMore, t.nextToken, t.err = ListV2(store, key, "\x00", t.nextToken, "/", 1e9, followLink) // exclude itself
t.Lock()
t.ready = true
t.cond.Signal()
Expand Down Expand Up @@ -286,7 +287,7 @@ func ListAllWithDelimiter(store ObjectStorage, prefix, start, end string, follow
return listed, nil
}

func ListWrap(store ObjectStorage, prefix, start, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
func ListV2(store ObjectStorage, prefix, start, token, delimiter string, limit int64, followLink bool) ([]Object, bool, string, error) {
var objs []Object
var err error
var nextToken string
Expand Down
8 changes: 4 additions & 4 deletions pkg/object/object_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func testStorage(t *testing.T, s ObjectStorage) {
if scPut != sc {
t.Fatalf("Storage class should be %q, got %q", sc, scPut)
}
if resp, _, _, err := ListWrap(s, "测试编码文件", "", "", "", 1, true); err != nil && err != notSupported {
if resp, _, _, err := ListV2(s, "测试编码文件", "", "", "", 1, true); err != nil && err != notSupported {
t.Logf("List testEncodeFile Failed: %s", err)
} else if len(resp) == 1 && resp[0].Key() != key {
t.Logf("List testEncodeFile Failed: expect key %s, but got %s", key, resp[0].Key())
Expand Down Expand Up @@ -315,7 +315,7 @@ func testStorage(t *testing.T, s ObjectStorage) {
}
}

if obs, _, _, err := ListWrap(s, "", "", "", "/", 10, true); err != nil {
if obs, _, _, err := ListV2(s, "", "", "", "/", 10, true); err != nil {
if !errors.Is(err, notSupported) {
t.Fatalf("list with delimiter: %s", err)
} else {
Expand All @@ -341,7 +341,7 @@ func testStorage(t *testing.T, s ObjectStorage) {
}
}

if obs, _, _, err := ListWrap(s, "a", "", "", "/", 10, true); err != nil {
if obs, _, _, err := ListV2(s, "a", "", "", "/", 10, true); err != nil {
if !errors.Is(err, notSupported) {
t.Fatalf("list with delimiter: %s", err)
}
Expand All @@ -357,7 +357,7 @@ func testStorage(t *testing.T, s ObjectStorage) {
}
}

if obs, _, _, err := ListWrap(s, "a/", "", "", "/", 10, true); err != nil {
if obs, _, _, err := ListV2(s, "a/", "", "", "/", 10, true); err != nil {
if !errors.Is(err, notSupported) {
t.Fatalf("list with delimiter: %s", err)
} else {
Expand Down
9 changes: 5 additions & 4 deletions pkg/object/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func ListAll(store ObjectStorage, prefix, marker string, followLink bool) (<-cha
var objs []Object
var err error
var nextToken string
objs, _, nextToken, err = ListWrap(store, prefix, marker, "", "", maxResults, followLink)
var hasMore bool
objs, hasMore, nextToken, err = ListV2(store, prefix, marker, "", "", maxResults, followLink)
if errors.Is(err, notSupported) {
return ListAllWithDelimiter(store, prefix, marker, "", followLink)
}
Expand All @@ -117,7 +118,7 @@ func ListAll(store ObjectStorage, prefix, marker string, followLink bool) (<-cha
lastkey := ""
first := true
END:
for len(objs) > 0 {
for hasMore {
for _, obj := range objs {
key := obj.Key()
if !first && key <= lastkey {
Expand All @@ -139,12 +140,12 @@ func ListAll(store ObjectStorage, prefix, marker string, followLink bool) (<-cha
marker = lastkey
startTime = time.Now()
logger.Debugf("Continue listing objects from %s marker %q", store, marker)
objs, _, nextToken, err = ListWrap(store, prefix, marker, nextToken, "", maxResults, followLink)
objs, hasMore, nextToken, err = ListV2(store, prefix, marker, nextToken, "", maxResults, followLink)
for err != nil {
logger.Warnf("Fail to list: %s, retry again", err.Error())
// slow down
time.Sleep(time.Millisecond * 100)
objs, _, nextToken, err = ListWrap(store, prefix, marker, nextToken, "", maxResults, followLink)
objs, hasMore, nextToken, err = ListV2(store, prefix, marker, nextToken, "", maxResults, followLink)
}
logger.Debugf("Found %d object from %s in %s", len(objs), store, time.Since(startTime))
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func ListAll(store object.ObjectStorage, prefix, start, end string, followLink b
var objs []object.Object
var nextToken string
var err error
objs, _, nextToken, err = object.ListWrap(store, prefix, marker, "", "", maxResults, followLink)
var hasMore bool
objs, hasMore, nextToken, err = object.ListV2(store, prefix, marker, "", "", maxResults, followLink)
if errors.Is(err, utils.ENOTSUP) {
return object.ListAllWithDelimiter(store, prefix, start, end, followLink)
}
Expand All @@ -125,7 +126,7 @@ func ListAll(store object.ObjectStorage, prefix, start, end string, followLink b
lastkey := ""
first := true
END:
for len(objs) > 0 {
for hasMore {
for _, obj := range objs {
key := obj.Key()
if !first && key <= lastkey {
Expand All @@ -151,14 +152,14 @@ func ListAll(store object.ObjectStorage, prefix, start, end string, followLink b
startTime = time.Now()
logger.Debugf("Continue listing objects from %s marker %q", store, marker)

objs, _, nextToken, err = object.ListWrap(store, prefix, marker, nextToken, "", maxResults, followLink)
objs, hasMore, nextToken, err = object.ListV2(store, prefix, marker, nextToken, "", maxResults, followLink)
count := 0
for err != nil && count < 3 {
logger.Warnf("Fail to list: %s, retry again", err.Error())
// slow down
time.Sleep(time.Millisecond * 100)

objs, _, nextToken, err = object.ListWrap(store, prefix, marker, nextToken, "", maxResults, followLink)
objs, hasMore, nextToken, err = object.ListV2(store, prefix, marker, nextToken, "", maxResults, followLink)
count++
}
logger.Debugf("Found %d object from %s in %s", len(objs), store, time.Since(startTime))
Expand Down Expand Up @@ -1102,13 +1103,13 @@ func listCommonPrefix(store object.ObjectStorage, prefix string, cp chan object.
var err error
var nextToken string
var marker string

var hasMore bool
for {
objs, _, nextToken, err = object.ListWrap(store, prefix, marker, nextToken, "/", maxResults, followLink)
objs, hasMore, nextToken, err = object.ListV2(store, prefix, marker, nextToken, "/", maxResults, followLink)
if err != nil {
return nil, err
}
if len(objs) == 0 {
if !hasMore {
break
}
total = append(total, objs...)
Expand Down

0 comments on commit e8c8b77

Please sign in to comment.