diff --git a/internal/backend_s3.go b/internal/backend_s3.go index c602aa63..cf3d7d2d 100644 --- a/internal/backend_s3.go +++ b/internal/backend_s3.go @@ -512,7 +512,19 @@ func (s *S3Backend) getRequestId(r *request.Request) string { r.HTTPResponse.Header.Get("x-amz-id-2") } -func (s *S3Backend) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) { +// FIXME: Move retries to common code from S3 +func (s *S3Backend) HeadBlob(req *HeadBlobInput) (resp *HeadBlobOutput, err error) { + s.readBackoff(func(attempt int) error { + resp, err = s.tryHeadBlob(req) + if err != nil && shouldRetry(err) { + s3Log.Errorf("Error getting metadata of %v (attempt %v): %v\n", req.Key, attempt, err) + } + return err + }) + return +} + +func (s *S3Backend) tryHeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) { head := s3.HeadObjectInput{Bucket: &s.bucket, Key: ¶m.Key, } @@ -542,7 +554,20 @@ func (s *S3Backend) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) { }, nil } -func (s *S3Backend) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) { +// FIXME: Move retries to common code from S3 +func (s *S3Backend) ListBlobs(req *ListBlobsInput) (resp *ListBlobsOutput, err error) { + s.readBackoff(func(attempt int) error { + resp, err = s.tryListBlobs(req) + if err != nil && shouldRetry(err) { + s3Log.Errorf("Error listing objects with prefix=%v delimiter=%v start-after=%v max-keys=%v (attempt %v): %v\n", + NilStr(req.Prefix), NilStr(req.Delimiter), NilStr(req.StartAfter), NilUInt32(req.MaxKeys), attempt, err) + } + return err + }) + return +} + +func (s *S3Backend) tryListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) { var maxKeys *int64 if param.MaxKeys != nil { @@ -868,7 +893,49 @@ func (s *S3Backend) CopyBlob(param *CopyBlobInput) (*CopyBlobOutput, error) { return &CopyBlobOutput{s.getRequestId(req)}, nil } -func (s *S3Backend) GetBlob(param *GetBlobInput) (*GetBlobOutput, error) { +func shouldRetry(err error) bool { + err = mapAwsError(err) + return err != syscall.ENOENT && err != syscall.EINVAL && + err != syscall.EACCES && err != syscall.ENOTSUP && err != syscall.ERANGE +} + +// FIXME: Add similar write backoff (now it's handled by file/dir code) +func (s *S3Backend) readBackoff(try func(attempt int) error) (err error) { + interval := s.flags.ReadRetryInterval + attempt := 1 + for { + err = try(attempt) + if err != nil { + if shouldRetry(err) && (s.flags.ReadRetryAttempts < 1 || attempt < s.flags.ReadRetryAttempts) { + attempt++ + time.Sleep(interval) + interval = time.Duration(s.flags.ReadRetryMultiplier * float64(interval)) + if interval > s.flags.ReadRetryMax { + interval = s.flags.ReadRetryMax + } + } else { + break + } + } else { + break + } + } + return +} + +// FIXME: Move retries to common code from S3 +func (s *S3Backend) GetBlob(req *GetBlobInput) (resp *GetBlobOutput, err error) { + s.readBackoff(func(attempt int) error { + resp, err = s.tryGetBlob(req) + if err != nil && shouldRetry(err) { + log.Errorf("Error reading %v +%v of %v (attempt %v): %v", req.Start, req.Count, req.Key, attempt, err) + } + return err + }) + return +} + +func (s *S3Backend) tryGetBlob(param *GetBlobInput) (*GetBlobOutput, error) { get := s3.GetObjectInput{ Bucket: &s.bucket, Key: ¶m.Key, diff --git a/internal/cfg/config.go b/internal/cfg/config.go index b765cfe7..1e5ff639 100644 --- a/internal/cfg/config.go +++ b/internal/cfg/config.go @@ -67,6 +67,10 @@ type FlagStorage struct { MaxParallelCopy int StatCacheTTL time.Duration HTTPTimeout time.Duration + ReadRetryInterval time.Duration + ReadRetryMultiplier float64 + ReadRetryMax time.Duration + ReadRetryAttempts int RetryInterval time.Duration ReadAheadKB uint64 SmallReadCount uint64 diff --git a/internal/cfg/flags.go b/internal/cfg/flags.go index d7e3017f..198e97ca 100644 --- a/internal/cfg/flags.go +++ b/internal/cfg/flags.go @@ -500,7 +500,31 @@ MISC OPTIONS: cli.DurationFlag{ Name: "retry-interval", Value: 30 * time.Second, - Usage: "Retry unsuccessful flushes after this amount of time", + Usage: "Retry unsuccessful writes after this time", + }, + + cli.DurationFlag{ + Name: "read-retry-interval", + Value: 1 * time.Second, + Usage: "Initial interval for retrying unsuccessful reads", + }, + + cli.Float64Flag{ + Name: "read-retry-mul", + Value: 2, + Usage: "Increase read retry interval this number of times on each unsuccessful attempt", + }, + + cli.DurationFlag{ + Name: "read-retry-max-interval", + Value: 60 * time.Second, + Usage: "Maximum interval for retrying unsuccessful reads", + }, + + cli.DurationFlag{ + Name: "read-retry-attempts", + Value: 0, + Usage: "Maximum read retry attempts (0 means unlimited)", }, cli.IntFlag{ @@ -758,6 +782,10 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) { StatCacheTTL: c.Duration("stat-cache-ttl"), HTTPTimeout: c.Duration("http-timeout"), RetryInterval: c.Duration("retry-interval"), + ReadRetryInterval: c.Duration("read-retry-interval"), + ReadRetryMultiplier: c.Float64("read-retry-mul"), + ReadRetryMax: c.Duration("read-retry-max-interval"), + ReadRetryAttempts: c.Int("read-retry-attempts"), ReadAheadKB: uint64(c.Int("read-ahead")), SmallReadCount: uint64(c.Int("small-read-count")), SmallReadCutoffKB: uint64(c.Int("small-read-cutoff")), diff --git a/internal/utils.go b/internal/utils.go index 11cca287..e61a0308 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -115,6 +115,14 @@ func PTime(v time.Time) *time.Time { return &v } +func NilUInt32(v *uint32) uint32 { + if v == nil { + return 0 + } else { + return *v + } +} + func NilInt64(v *int64) int64 { if v == nil { return 0