From a585e81766042ff736e3c1a771945542a03b0c6d Mon Sep 17 00:00:00 2001 From: MegaByte875 Date: Thu, 1 Feb 2024 02:53:32 +0800 Subject: [PATCH] fix gcsfs lib bugs --- pkg/storage/gs.go | 192 +++++++++++++++++++++++++--------------------- 1 file changed, 106 insertions(+), 86 deletions(-) diff --git a/pkg/storage/gs.go b/pkg/storage/gs.go index ca5cf7b..5f8e898 100644 --- a/pkg/storage/gs.go +++ b/pkg/storage/gs.go @@ -137,44 +137,24 @@ func (g *GS) ExistDir(ctx context.Context, uri string) bool { return false } - cx, cancel := context.WithDeadline(ctx, time.Now().Add(defaultTimout)) - defer cancel() + bucket := b.GetGs().Bucket + path := b.GetGs().Path + prefix := getPrefix(path) + + if prefix == "" { + return false + } - bkt := g.client.Bucket(b.GetGs().Bucket) query := &storage.Query{ - Prefix: b.GetGs().Path, + Prefix: prefix, Delimiter: "/", } - it := bkt.Objects(cx, query) - pager := iterator.NewPager(it, defaultGCSPageSize, "") - - exist := false - for { - var objects []*storage.ObjectAttrs - pageToken, err := pager.NextPage(&objects) - if err != nil { - log.WithField("uri", uri).Errorf("ExistDir, get nex page failed: %v", err) - return false - } - - for _, attrs := range objects { - if !attrs.Deleted.IsZero() { - continue - } - isDir := strings.HasSuffix(attrs.Name, "/") - if isDir && attrs.Size == 0 { - exist = true - break - } - } - - objects = nil - if pageToken == "" { - break - } + it := g.client.Bucket(bucket).Objects(context.Background(), query) + if _, err := it.Next(); err == nil { + return true } - log.WithField("uri", uri).Debugf("ExistDir %s: %v.", uri, exist) - return true + + return false } func (g *GS) EnsureDir(ctx context.Context, uri string, recursively bool) error { @@ -202,56 +182,51 @@ func (g *GS) ListDir(ctx context.Context, uri string) ([]string, error) { return nil, fmt.Errorf("ListDir, check and set uri %s failed: %w", uri, err) } - prefix := b.GetGs().Path - if !strings.HasSuffix(prefix, "/") { - prefix += "/" + bucket := b.GetGs().Bucket + path := b.GetGs().Path + prefix := getPrefix(path) + + names, err := g.readdirNames(bucket, prefix) + if err != nil { + return nil, err } - cx, cancel := context.WithDeadline(ctx, time.Now().Add(defaultTimout)) - defer cancel() + return names, nil +} - names := make([]string, 0) - bkt := g.client.Bucket(b.GetGs().Bucket) - query := &storage.Query{ - Prefix: prefix, - Delimiter: "/", +func (g *GS) RemoveDir(ctx context.Context, uri string) error { + pathInfo, err := g.gcsFS.Stat(uri) + if errors.Is(err, gcsfs.ErrFileNotFound) { + // return early if file doesn't exist + return nil + } + if err != nil { + return err } - it := bkt.Objects(cx, query) - pager := iterator.NewPager(it, defaultGCSPageSize, "") + if !pathInfo.IsDir() { + return g.gcsFS.Remove(uri) + } - for { - var objects []*storage.ObjectAttrs - pageToken, err := pager.NextPage(&objects) - if err != nil { - return nil, fmt.Errorf("ListDir, get nex page failed: %v", err) - } + b := g.backend.DeepCopy() + if err := b.SetUri(uri); err != nil { + return fmt.Errorf("RemoveDir, check and set uri %s failed: %w", uri, err) + } - for _, attrs := range objects { - name, err := filepath.Rel(prefix, attrs.Prefix) - if err != nil { - return nil, fmt.Errorf("ListDir, get relative path failed: %v", err) - } - if strings.Contains(name, ".") { - continue - } - names = append(names, name) - } + bucket := b.GetGs().Bucket + path := b.GetGs().Path + prefix := getPrefix(path) - objects = nil - if pageToken == "" { - break - } + names, err := g.readdirNames(bucket, prefix) + if err != nil { + return err } - return names, nil -} - -func (g *GS) RemoveDir(ctx context.Context, uri string) error { - if err := g.gcsFS.RemoveAll(uri); err != nil { - if errors.Is(err, gcsfs.ErrFileNotFound) { - return nil + for _, name := range names { + subUri := uri + "/" + name + err = g.RemoveDir(ctx, subUri) + if err != nil { + return err } - return err } return nil } @@ -316,12 +291,11 @@ func (g *GS) uploadPrefix(prefix, localDir string) error { }() for path := range walker { - rel, err := filepath.Rel(localDir, path) + relPath, err := filepath.Rel(localDir, path) if err != nil { - log.WithError(err).Errorf("Unable to get relative path: %s.", path) + return fmt.Errorf("unable to get relative path: %s", path) } - key := filepath.Join(prefix, rel) - + key := filepath.Join(prefix, relPath) err = g.uploadToStorage(key, path) if err != nil { return fmt.Errorf("upload from %s to %s failed: %w", path, key, err) @@ -392,7 +366,7 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error { if !pathInfo.IsDir() { relPath, err := filepath.Rel(baseUri, uri) if err != nil { - return err + return fmt.Errorf("unable to get relative path: %s", relPath) } localFile := filepath.Join(localPath, relPath) objectKey, err := getObjectKey(uri) @@ -402,19 +376,21 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error { return g.downloadToFile(localFile, objectKey) } - var dir *gcsfs.GcsFile - dir, err = g.gcsFS.Open(uri) - if err != nil { - return err + b := g.backend.DeepCopy() + if err := b.SetUri(uri); err != nil { + return fmt.Errorf("downloadToDir, check and set uri %s failed: %w", uri, err) } - var infos []os.FileInfo - infos, err = dir.Readdir(0) + bucket := b.GetGs().Bucket + path := b.GetGs().Path + prefix := getPrefix(path) + + names, err := g.readdirNames(bucket, prefix) if err != nil { return err } - for _, info := range infos { - subUri := uri + "/" + info.Name() + for _, name := range names { + subUri := uri + "/" + name err = g.downloadToDir(localPath, subUri, baseUri) if err != nil { return err @@ -423,6 +399,39 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error { return nil } +func (g *GS) readdirNames(bucket, prefix string) ([]string, error) { + names := make([]string, 0) + query := &storage.Query{ + Prefix: prefix, + Delimiter: "/", + } + it := g.client.Bucket(bucket).Objects(context.Background(), query) + for { + object, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("Bucket(%q).Objects(): %w", bucket, err) + } + if object.Name != "" { + fileEntry := filepath.Base(object.Name) + names = append(names, fileEntry) + continue + } + if object.Prefix != "" { + // It's a virtual folder! It does not have a name, but prefix - this is how GCS API + // deals with them at the moment + dirEntry := filepath.Base(object.Prefix) + if err != nil { + return nil, fmt.Errorf("Bucket(%q).Objects(): %v", bucket, err) + } + names = append(names, dirEntry) + } + } + return names, nil +} + func getObjectKey(uri string) (string, error) { u, err := url.Parse(uri) if err != nil { @@ -430,3 +439,14 @@ func getObjectKey(uri string) (string, error) { } return strings.TrimLeft(u.Path, "/ "), nil } + +func getPrefix(path string) string { + prefix := "" + if path != "" && path != "." && path != "/" { + prefix = strings.TrimPrefix(path, "/") + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + } + return prefix +}