Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Mar 23, 2024
1 parent 19fc0d5 commit 21bd4d2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
17 changes: 12 additions & 5 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type YproxyRetryReader struct {
bytesWrite int64
retryCnt int64
retryLimit int
needReacquire bool
reacquireReaderFn func(offsetStart int64) (io.ReadCloser, error)
}

Expand All @@ -34,11 +35,10 @@ func (y *YproxyRetryReader) Close() error {
// Read implements io.ReadCloser.
func (y *YproxyRetryReader) Read(p []byte) (int, error) {

needReacquire := true

for retry := 0; retry < y.retryLimit; retry++ {

if needReacquire {
if y.needReacquire {

r, err := y.reacquireReaderFn(y.bytesWrite)

if err != nil {
Expand All @@ -52,10 +52,13 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {
//
y.underlying = r

needReacquire = false
y.needReacquire = false
}

n, err := y.underlying.Read(p)
if err == io.EOF {
return n, err
}
if err != nil || n < 0 {
ylogger.Zero.Error().Err(err).Int("offset reached", int(y.bytesWrite)).Int("retry count", int(retry)).Msg("encounter read error")

Expand All @@ -64,9 +67,13 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {

// try to reacquire connection to external storage and continue read
// from previously reached point

y.needReacquire = true
continue
} else {
y.bytesWrite += int64(n)

return n, err
}
}
return -1, fmt.Errorf("failed to unpload within retries")
Expand All @@ -81,6 +88,7 @@ func newYRetryReader(reacquireReaderFn func(offsetStart int64) (io.ReadCloser, e
reacquireReaderFn: reacquireReaderFn,
retryLimit: defaultRetryLimit,
bytesWrite: 0,
needReacquire: true,
}
}

Expand Down Expand Up @@ -112,7 +120,6 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
ylogger.Zero.Debug().Str("object-path", msg.Name).Int64("offset", offsetStart).Msg("cat object with offset")
r, err := s.CatFileFromStorage(msg.Name, offsetStart)
if err != nil {
_ = ycl.ReplyError(err, "failed to read from external storage")
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.
Range: aws.String(fmt.Sprintf("bytes=%d-", offset)),
}

ylogger.Zero.Debug().Str("key", objectPath).Str("bucket",
ylogger.Zero.Debug().Str("key", objectPath).Int64("offset", offset).Str("bucket",
s.cnf.StorageBucket).Msg("requesting external storage")

object, err := sess.GetObject(input)
Expand Down

0 comments on commit 21bd4d2

Please sign in to comment.