From 0c562a5bcaed82ac77de950d2947d864733236ce Mon Sep 17 00:00:00 2001 From: MegaByte875 Date: Wed, 21 Feb 2024 11:09:01 +0800 Subject: [PATCH] fix download bug --- pkg/storage/gs.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/storage/gs.go b/pkg/storage/gs.go index 5f8e898..c0f224a 100644 --- a/pkg/storage/gs.go +++ b/pkg/storage/gs.go @@ -183,9 +183,7 @@ func (g *GS) ListDir(ctx context.Context, uri string) ([]string, error) { } bucket := b.GetGs().Bucket - path := b.GetGs().Path - prefix := getPrefix(path) - + prefix := getPrefix(b.GetGs().Path) names, err := g.readdirNames(bucket, prefix) if err != nil { return nil, err @@ -197,10 +195,12 @@ func (g *GS) ListDir(ctx context.Context, uri string) ([]string, error) { func (g *GS) RemoveDir(ctx context.Context, uri string) error { pathInfo, err := g.gcsFS.Stat(uri) if errors.Is(err, gcsfs.ErrFileNotFound) { + log.Errorf("URI %s not found", uri) // return early if file doesn't exist return nil } if err != nil { + log.Errorf("stat URI %s failed: %v", uri, err) return err } @@ -272,12 +272,15 @@ func (g *GS) uploadToStorage(key, file string) error { //} //o = o.If(storage.Conditions{GenerationMatch: attrs.Generation}) wc := o.NewWriter(context.Background()) - if _, err = io.Copy(wc, f); err != nil { + written, err := io.Copy(wc, f) + if err != nil { return fmt.Errorf("io.Copy: %w", err) } if err := wc.Close(); err != nil { return fmt.Errorf("Writer.Close: %w", err) } + + log.Infof("Upload from %s to %s successfully, bytes=%d", file, key, written) return nil } @@ -302,7 +305,7 @@ func (g *GS) uploadPrefix(prefix, localDir string) error { } } - log.Debugf("Upload from %s to %s recursively.", localDir, g.backend.Uri()) + log.Infof("Upload from %s to %s recursively", localDir, g.backend.Uri()) return nil } @@ -334,7 +337,8 @@ func (g *GS) downloadToFile(file, key string) error { } defer rc.Close() - if _, err := io.Copy(f, rc); err != nil { + written, err := io.Copy(f, rc) + if err != nil { return fmt.Errorf("io.Copy: %w", err) } @@ -342,6 +346,7 @@ func (g *GS) downloadToFile(file, key string) error { return fmt.Errorf("f.Close: %w", err) } + log.Infof("Download from %s to %s successfully, bytes=%d", key, file, written) return nil } @@ -352,14 +357,16 @@ func (g *GS) downloadPrefix(localPath, uri, baseUri string) error { // localPath: /tmp // uri: gs://nebula-2024/BACKUP_2024_01_20_01_20_56/data/2/3/... // baseUri: gs://nebula-2024/BACKUP_2024_01_20_01_20_56/ -// object key: BACKUP_2023_11_14_04_49_54/data/2/3/data/000009.sst +// object key: BACKUP_2024_01_20_01_20_56/data/2/3/data/000009.sst func (g *GS) downloadToDir(localPath, uri, baseUri string) error { pathInfo, err := g.gcsFS.Stat(uri) if errors.Is(err, gcsfs.ErrFileNotFound) { + log.Errorf("URI %s not found", uri) // return early if file doesn't exist return nil } if err != nil { + log.Errorf("stat URI %s failed: %v", uri, err) return err } @@ -390,7 +397,7 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error { return err } for _, name := range names { - subUri := uri + "/" + name + subUri := getSubUri(uri, name) err = g.downloadToDir(localPath, subUri, baseUri) if err != nil { return err @@ -432,6 +439,13 @@ func (g *GS) readdirNames(bucket, prefix string) ([]string, error) { return names, nil } +func getSubUri(uri, name string) string { + if strings.HasSuffix(uri, "/") { + return uri + name + } + return uri + "/" + name +} + func getObjectKey(uri string) (string, error) { u, err := url.Parse(uri) if err != nil {