Skip to content

Commit

Permalink
fix gcsfs lib bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaByte875 committed Jan 31, 2024
1 parent f6c6e3a commit a585e81
Showing 1 changed file with 106 additions and 86 deletions.
192 changes: 106 additions & 86 deletions pkg/storage/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,44 +137,24 @@ func (g *GS) ExistDir(ctx context.Context, uri string) bool {
return false
}

cx, cancel := context.WithDeadline(ctx, time.Now().Add(defaultTimout))
defer cancel()
bucket := b.GetGs().Bucket
path := b.GetGs().Path
prefix := getPrefix(path)

if prefix == "" {
return false
}

bkt := g.client.Bucket(b.GetGs().Bucket)
query := &storage.Query{
Prefix: b.GetGs().Path,
Prefix: prefix,
Delimiter: "/",
}
it := bkt.Objects(cx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, "")

exist := false
for {
var objects []*storage.ObjectAttrs
pageToken, err := pager.NextPage(&objects)
if err != nil {
log.WithField("uri", uri).Errorf("ExistDir, get nex page failed: %v", err)
return false
}

for _, attrs := range objects {
if !attrs.Deleted.IsZero() {
continue
}
isDir := strings.HasSuffix(attrs.Name, "/")
if isDir && attrs.Size == 0 {
exist = true
break
}
}

objects = nil
if pageToken == "" {
break
}
it := g.client.Bucket(bucket).Objects(context.Background(), query)
if _, err := it.Next(); err == nil {
return true
}
log.WithField("uri", uri).Debugf("ExistDir %s: %v.", uri, exist)
return true

return false
}

func (g *GS) EnsureDir(ctx context.Context, uri string, recursively bool) error {
Expand Down Expand Up @@ -202,56 +182,51 @@ func (g *GS) ListDir(ctx context.Context, uri string) ([]string, error) {
return nil, fmt.Errorf("ListDir, check and set uri %s failed: %w", uri, err)
}

prefix := b.GetGs().Path
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
bucket := b.GetGs().Bucket
path := b.GetGs().Path
prefix := getPrefix(path)

names, err := g.readdirNames(bucket, prefix)
if err != nil {
return nil, err
}

cx, cancel := context.WithDeadline(ctx, time.Now().Add(defaultTimout))
defer cancel()
return names, nil
}

names := make([]string, 0)
bkt := g.client.Bucket(b.GetGs().Bucket)
query := &storage.Query{
Prefix: prefix,
Delimiter: "/",
func (g *GS) RemoveDir(ctx context.Context, uri string) error {
pathInfo, err := g.gcsFS.Stat(uri)
if errors.Is(err, gcsfs.ErrFileNotFound) {
// return early if file doesn't exist
return nil
}
if err != nil {
return err
}

it := bkt.Objects(cx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, "")
if !pathInfo.IsDir() {
return g.gcsFS.Remove(uri)
}

for {
var objects []*storage.ObjectAttrs
pageToken, err := pager.NextPage(&objects)
if err != nil {
return nil, fmt.Errorf("ListDir, get nex page failed: %v", err)
}
b := g.backend.DeepCopy()
if err := b.SetUri(uri); err != nil {
return fmt.Errorf("RemoveDir, check and set uri %s failed: %w", uri, err)
}

for _, attrs := range objects {
name, err := filepath.Rel(prefix, attrs.Prefix)
if err != nil {
return nil, fmt.Errorf("ListDir, get relative path failed: %v", err)
}
if strings.Contains(name, ".") {
continue
}
names = append(names, name)
}
bucket := b.GetGs().Bucket
path := b.GetGs().Path
prefix := getPrefix(path)

objects = nil
if pageToken == "" {
break
}
names, err := g.readdirNames(bucket, prefix)
if err != nil {
return err
}
return names, nil
}

func (g *GS) RemoveDir(ctx context.Context, uri string) error {
if err := g.gcsFS.RemoveAll(uri); err != nil {
if errors.Is(err, gcsfs.ErrFileNotFound) {
return nil
for _, name := range names {
subUri := uri + "/" + name
err = g.RemoveDir(ctx, subUri)
if err != nil {
return err
}
return err
}
return nil
}
Expand Down Expand Up @@ -316,12 +291,11 @@ func (g *GS) uploadPrefix(prefix, localDir string) error {
}()

for path := range walker {
rel, err := filepath.Rel(localDir, path)
relPath, err := filepath.Rel(localDir, path)
if err != nil {
log.WithError(err).Errorf("Unable to get relative path: %s.", path)
return fmt.Errorf("unable to get relative path: %s", path)
}
key := filepath.Join(prefix, rel)

key := filepath.Join(prefix, relPath)
err = g.uploadToStorage(key, path)
if err != nil {
return fmt.Errorf("upload from %s to %s failed: %w", path, key, err)
Expand Down Expand Up @@ -392,7 +366,7 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error {
if !pathInfo.IsDir() {
relPath, err := filepath.Rel(baseUri, uri)
if err != nil {
return err
return fmt.Errorf("unable to get relative path: %s", relPath)
}
localFile := filepath.Join(localPath, relPath)
objectKey, err := getObjectKey(uri)
Expand All @@ -402,19 +376,21 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error {
return g.downloadToFile(localFile, objectKey)
}

var dir *gcsfs.GcsFile
dir, err = g.gcsFS.Open(uri)
if err != nil {
return err
b := g.backend.DeepCopy()
if err := b.SetUri(uri); err != nil {
return fmt.Errorf("downloadToDir, check and set uri %s failed: %w", uri, err)
}

var infos []os.FileInfo
infos, err = dir.Readdir(0)
bucket := b.GetGs().Bucket
path := b.GetGs().Path
prefix := getPrefix(path)

names, err := g.readdirNames(bucket, prefix)
if err != nil {
return err
}
for _, info := range infos {
subUri := uri + "/" + info.Name()
for _, name := range names {
subUri := uri + "/" + name
err = g.downloadToDir(localPath, subUri, baseUri)
if err != nil {
return err
Expand All @@ -423,10 +399,54 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error {
return nil
}

func (g *GS) readdirNames(bucket, prefix string) ([]string, error) {
names := make([]string, 0)
query := &storage.Query{
Prefix: prefix,
Delimiter: "/",
}
it := g.client.Bucket(bucket).Objects(context.Background(), query)
for {
object, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("Bucket(%q).Objects(): %w", bucket, err)
}
if object.Name != "" {
fileEntry := filepath.Base(object.Name)
names = append(names, fileEntry)
continue
}
if object.Prefix != "" {
// It's a virtual folder! It does not have a name, but prefix - this is how GCS API
// deals with them at the moment
dirEntry := filepath.Base(object.Prefix)
if err != nil {
return nil, fmt.Errorf("Bucket(%q).Objects(): %v", bucket, err)
}
names = append(names, dirEntry)
}
}
return names, nil
}

func getObjectKey(uri string) (string, error) {
u, err := url.Parse(uri)
if err != nil {
return "", err
}
return strings.TrimLeft(u.Path, "/ "), nil
}

func getPrefix(path string) string {
prefix := ""
if path != "" && path != "." && path != "/" {
prefix = strings.TrimPrefix(path, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
return prefix
}

0 comments on commit a585e81

Please sign in to comment.