Skip to content

Commit

Permalink
Merge pull request #306 from sonroyaalmerol/improve-log-overhead
Browse files Browse the repository at this point in the history
back to simple readat
  • Loading branch information
sonroyaalmerol authored Mar 2, 2025
2 parents 7691b62 + 14c6123 commit 6626bdb
Showing 1 changed file with 16 additions and 129 deletions.
145 changes: 16 additions & 129 deletions internal/backend/arpc/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
package arpcfs

import (
"errors"
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -58,138 +55,28 @@ func (f *ARPCFile) ReadAt(p []byte, off int64) (int, error) {
return 0, os.ErrInvalid
}

if len(p) == 0 {
return 0, nil
req := vssfs.ReadAtReq{
HandleID: f.handleID,
Offset: off,
Length: len(p),
}

const (
maxChunkSize = 1 << 20 // 1MB chunks
minChunkSize = 1 << 16 // 64KB minimum
maxRetries = 3
retryDelay = 100 * time.Millisecond
)

totalBytesRead := 0
remaining := len(p)
retryCount := 0

// Get optimal chunk size
chunkSize := maxChunkSize
if remaining < maxChunkSize*4 {
chunkSize = max(minChunkSize, remaining/4)
}

reqBuf := bufferPool.Get().([]byte)
defer bufferPool.Put(reqBuf)

for remaining > 0 {
if err := f.fs.ctx.Err(); err != nil {
return totalBytesRead, err
}

currentChunk := min(remaining, chunkSize)
req := vssfs.ReadAtReq{
HandleID: f.handleID,
Offset: off + int64(totalBytesRead),
Length: currentChunk,
}

reqBytes, err := req.MarshalMsg(reqBuf[:0])
if err != nil {
return totalBytesRead, os.ErrInvalid
}

// Try to read with retries
bytesRead, err := func() (int, error) {
for retryCount < maxRetries {
if retryCount > 0 {
// Exponential backoff
delay := retryDelay * time.Duration(1<<uint(retryCount-1))
time.Sleep(delay)
}

n, err := f.fs.session.CallMsgWithBuffer(
f.fs.ctx,
f.jobId+"/ReadAt",
reqBytes,
p[totalBytesRead:totalBytesRead+currentChunk],
)

if err == nil || err == io.EOF {
retryCount = 0 // Reset retry count on success
return n, err
}

if !isRetryableError(err) {
return n, err // Don't retry non-retryable errors
}

retryCount++
}
return 0, fmt.Errorf("max retries (%d) exceeded", maxRetries)
}()

// Update progress
if bytesRead > 0 {
totalBytesRead += bytesRead
remaining -= bytesRead
go atomic.AddInt64(&f.fs.totalBytes, int64(bytesRead))
}

// Handle errors
if err != nil {
if err == io.EOF && totalBytesRead > 0 {
return totalBytesRead, nil
}
return totalBytesRead, err
}

if bytesRead == 0 {
break
}
}

if totalBytesRead > 0 {
return totalBytesRead, nil
reqBytes, err := req.MarshalMsg(nil)
if err != nil {
return 0, os.ErrInvalid
}
return 0, io.EOF
}

// Helper to identify retryable errors
func isRetryableError(err error) bool {
if err == nil {
return false
bytesRead, err := f.fs.session.CallMsgWithBuffer(f.fs.ctx, f.jobId+"/ReadAt", reqBytes, p)
if err != nil {
syslog.L.Errorf("Read RPC failed (%s): %v", f.name, err)
return 0, err
}

// Unwrap the error since most are wrapped with fmt.Errorf
err = errors.Unwrap(err)
if err == nil {
return false
}
go atomic.AddInt64(&f.fs.totalBytes, int64(bytesRead))

// Check specific error strings/types that occur in CallMsgWithBuffer
switch {
case errors.Is(err, io.EOF):
// EOF during response read or length prefix read is retryable
return true
case strings.Contains(err.Error(), "failed to open stream"):
// Stream opening failures are retryable
return true
case strings.Contains(err.Error(), "failed to read response"):
// Response reading failures are retryable
return true
case strings.Contains(err.Error(), "failed to read length prefix"):
// Length prefix reading failures are retryable
return true
case strings.Contains(err.Error(), "read error after"):
// Data reading failures are retryable
return true
// If we read less than requested, it indicates EOF
if bytesRead < len(p) {
return bytesRead, io.EOF
}

// Non-retryable errors:
// - "failed to marshal request" (marshaling errors are not network-related)
// - "failed to unmarshal response" (unmarshaling errors indicate protocol issues)
// - "RPC error: status" (application-level errors)

return false
return bytesRead, nil
}

0 comments on commit 6626bdb

Please sign in to comment.