Skip to content

Commit

Permalink
Merge pull request #59 from MegaByte875/retry_upload
Browse files Browse the repository at this point in the history
retries for uploading and downloading
  • Loading branch information
kqzh authored Mar 4, 2024
2 parents 6a1d4df + 2d118cc commit 0bbd267
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
pb "github.com/vesoft-inc/nebula-agent/v3/pkg/proto"
)

const (
defaultUploadPartSize = 1024 * 1024 * 32
defaultDownloadPartSize = 1024 * 1024 * 32
maxRetries = 3
)

type S3 struct {
backend *pb.Backend

Expand Down Expand Up @@ -88,13 +94,18 @@ func (s *S3) downloadToFile(file, key string) error {
Key: aws.String(key),
}

downloader := s3manager.NewDownloader(s.sess)
numBytes, err := downloader.Download(fd, req)
if err != nil {
return fmt.Errorf("download from %s to %s failed: %w", key, file, err)
downloader := s3manager.NewDownloader(s.sess, func(u *s3manager.Downloader) {
u.PartSize = defaultDownloadPartSize
})
for i := 0; i < maxRetries; i++ {
numBytes, err := downloader.Download(fd, req)
if err == nil {
log.Debugf("Download from %s to %s successfully, bytes=%d.", key, file, numBytes)
break
}
log.Errorf("download from %s to %s failed: %v", key, file, err)
}

log.Debugf("Download from %s to %s successfully, bytes=%d.", key, file, numBytes)
return nil
}

Expand Down Expand Up @@ -159,17 +170,22 @@ func (s *S3) uploadToStorage(key, file string) error {
}
defer fd.Close()

uploader := s3manager.NewUploader(s.sess)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.backend.GetS3().Bucket),
Key: aws.String(key),
Body: fd,
uploader := s3manager.NewUploader(s.sess, func(u *s3manager.Uploader) {
u.PartSize = defaultUploadPartSize
})
if err != nil {
return fmt.Errorf("upload from %s to %s failed: %w", file, key, err)
for i := 0; i < maxRetries; i++ {
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.backend.GetS3().Bucket),
Key: aws.String(key),
Body: fd,
})
if err == nil {
log.Debugf("Upload from %s to %s successfully.", file, key)
break
}
log.Errorf("upload from %s to %s failed: %v", file, key, err)
}

log.Debugf("Upload from %s to %s successfully.", file, key)
return nil
}

Expand Down

0 comments on commit 0bbd267

Please sign in to comment.