Skip to content

Commit

Permalink
correction
Browse files Browse the repository at this point in the history
  • Loading branch information
sethiay committed Jun 1, 2024
1 parent 2a30645 commit 326fa8f
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 12 deletions.
8 changes: 4 additions & 4 deletions internal/cache/file/cache_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type CacheHandle struct {
}

func NewCacheHandle(localFileHandle *os.File, fileDownloadJob *downloader.Job,
fileInfoCache *lru.Cache, cacheFileForRangeRead bool, initialOffset int64) *CacheHandle {
fileInfoCache *lru.Cache, cacheFileForRangeRead bool, initialOffset int64) *CacheHandle {
return &CacheHandle{
fileHandle: localFileHandle,
fileDownloadJob: fileDownloadJob,
Expand All @@ -80,8 +80,8 @@ func (fch *CacheHandle) validateCacheHandle() error {
// downloaded cache file. Otherwise, it returns an appropriate error message.
func (fch *CacheHandle) shouldReadFromCache(jobStatus *downloader.JobStatus, requiredOffset int64) (err error) {
if jobStatus.Err != nil ||
jobStatus.Name == downloader.Invalid ||
jobStatus.Name == downloader.Failed {
jobStatus.Name == downloader.Invalid ||
jobStatus.Name == downloader.Failed {
err := fmt.Errorf("%s: jobStatus: %s jobError: %w", util.InvalidFileDownloadJobErrMsg, jobStatus.Name, jobStatus.Err)
return err
} else if jobStatus.Offset < requiredOffset {
Expand Down Expand Up @@ -254,7 +254,7 @@ func (fch *CacheHandle) IsSequential(currentOffset int64) bool {
return false
}
// Deliberately making isSeq false always so that the foreground read serves
// are not blocked
// are not blocked
return false
}

Expand Down
4 changes: 4 additions & 0 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ func (job *Job) downloadObjectAsync() {
}
start += maxRead
if start == newReaderLimit {
err = newReader.Close()
if err != nil {
logger.Errorf("Job:%p (%s:/%s) error while closing reader: %v", job, job.bucket.Name(), job.object.Name, err)
}
newReader = nil
}

Expand Down
196 changes: 196 additions & 0 deletions internal/cache/file/downloader/parallel_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package downloader

import (
"errors"
"fmt"
"io"
"os"
"strings"
"sync"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru"
cacheutil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/monitor"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"golang.org/x/net/context"
)

func (job *Job) rangeDownloader(start, end int64) (err error) {
newReader, err := job.bucket.NewReader(
job.cancelCtx,
&gcs.ReadObjectRequest{
Name: job.object.Name,
Generation: job.object.Generation,
Range: &gcs.ByteRange{
Start: uint64(start),
Limit: uint64(end),
},
ReadCompressed: job.object.HasContentEncodingGzip(),
})
if err != nil {
return
}
monitor.CaptureGCSReadMetrics(job.cancelCtx, util.Sequential, end-start)

// Open cacheFile and write into it.
cacheFile, err := os.OpenFile(job.fileSpec.Path, os.O_RDWR, job.fileSpec.FilePerm)
if err != nil {
return
}
defer func() {
if err != nil {
_ = newReader.Close()
_ = cacheFile.Close()
return
}
err = newReader.Close()
if err != nil {
logger.Errorf("Job:%p (%s:/%s) error while closing reader: %v", job, job.bucket.Name(), job.object.Name, err)
}
err = cacheFile.Close()
if err != nil {
return
}
}()

_, err = cacheFile.Seek(start, 0)
if err != nil {
err = fmt.Errorf("downloadObjectAsync: error while seeking file handle, seek %d: %w", start, err)
return
}
// Copy the contents from NewReader to cache file.
_, readErr := io.CopyN(cacheFile, newReader, end-start)
if readErr != nil {
err = fmt.Errorf("downloadObjectAsync: error at the time of copying content to cache file %w", readErr)
return
}
return
}

// downloadObjectAsync downloads the backing GCS object into a file as part of
// file cache using NewReader method of gcs.Bucket.
//
// Note: There can only be one async download running for a job at a time.
// Acquires and releases LOCK(job.mu)
func (job *Job) downloadObjectInParallelAsync() {
// Close the job.doneCh, clear the cancelFunc & cancelCtx and call the
// remove job callback function in any case - completion/failure.
defer func() {
job.cancelFunc()
close(job.doneCh)

job.mu.Lock()
if job.removeJobCallback != nil {
job.removeJobCallback()
job.removeJobCallback = nil
}
job.cancelCtx, job.cancelFunc = nil, nil
job.mu.Unlock()
}()

// Create, open and truncate cache file for writing object into it.
cacheFile, err := cacheutil.CreateFile(job.fileSpec, os.O_TRUNC|os.O_WRONLY)
if err != nil {
err = fmt.Errorf("downloadObjectAsync: error in creating cache file: %w", err)
job.failWhileDownloading(err)
return
}
// Assumption: If we don't have the file already created then it may happen
// that parallel go routines writing to the same file even at disjoint parts
// can make the data corrupt.
err = cacheFile.Truncate(int64(job.object.Size))
if err != nil {
err = fmt.Errorf("downloadObjectAsync: error while truncating the cache file: %w", err)
job.failWhileDownloading(err)
return
}
err = cacheFile.Close()
if err != nil {
err = fmt.Errorf("downloadObjectAsync: error while closing cache file: %w", err)
job.failWhileDownloading(err)
}

notifyInvalid := func() {
job.mu.Lock()
job.status.Name = Invalid
job.notifySubscribers()
job.mu.Unlock()
}

var start, end int64
end = int64(job.object.Size)
var readRequestSize int64 = int64(job.readRequestSizeMb * cacheutil.MiB)
var oneStepTotalReadSize int64 = readRequestSize * int64(job.downloadParallelism)

for {
select {
case <-job.cancelCtx.Done():
return
default:
if start < end {
// Parallely download different ranges not more than ReadChunkSize and
// not using go routines more than AsyncDownloadParallelism
asyncDownloaderErrors := make([]error, job.downloadParallelism)
wg := sync.WaitGroup{}
var singleStepRead int64 = 0
for jobNum := 0; (jobNum < job.downloadParallelism) && (singleStepRead < oneStepTotalReadSize) && (start < end); jobNum++ {
wg.Add(1)
rangeStart := start
rangeEnd := min(rangeStart+readRequestSize, end)
downloadFunc := func(wg *sync.WaitGroup, assignedJobNum, rangeStart, rangeEnd int64) {
defer wg.Done()
asyncDownloaderErrors[assignedJobNum] = job.rangeDownloader(rangeStart, rangeEnd)
}
go downloadFunc(&wg, int64(jobNum), rangeStart, rangeEnd)
singleStepRead = singleStepRead + rangeEnd - rangeStart
start = rangeEnd
}
wg.Wait()
// If any of the go routine failed, consider the async job failed.
for jobNum := 0; jobNum < job.downloadParallelism; jobNum++ {
err = asyncDownloaderErrors[jobNum]
if err != nil {
if errors.Is(err, context.Canceled) {
notifyInvalid()
return
}
job.failWhileDownloading(err)
return
}
}

job.mu.Lock()
job.status.Offset = start
err = job.updateFileInfoCache()
// Notify subscribers if file cache is updated.
if err == nil {
job.notifySubscribers()
} else if strings.Contains(err.Error(), lru.EntryNotExistErrMsg) {
// Download job expects entry in file info cache for the file it is
// downloading. If the entry is deleted in between which is expected
// to happen at the time of eviction, then the job should be
// marked Invalid instead of Failed.
job.status.Name = Invalid
job.notifySubscribers()
logger.Tracef("Job:%p (%s:/%s) is no longer valid due to absense of entry in file info cache.", job, job.bucket.Name(), job.object.Name)
job.mu.Unlock()
return
}
job.mu.Unlock()
// Change status of job in case of error while updating file cache.
if err != nil {
job.failWhileDownloading(err)
return
}
} else {
job.mu.Lock()
job.status.Name = Completed
job.notifySubscribers()
job.mu.Unlock()
return
}
}
}
}
16 changes: 8 additions & 8 deletions internal/config/mount_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ const (
DefaultAnonymousAccess = false
DefaultEnableHNS = false

// MetadataPrefetchOnMountDisabled is the mode without metadata-prefetch.
MetadataPrefetchOnMountDisabled string = "disabled"
// MetadataPrefetchOnMountSynchronous is the prefetch-mode where mounting is not marked complete until prefetch is complete.
MetadataPrefetchOnMountSynchronous string = "sync"
// MetadataPrefetchOnMountAsynchronous is the prefetch-mode where mounting is marked complete once prefetch has started.
MetadataPrefetchOnMountAsynchronous string = "async"
// DefaultMetadataPrefetchOnMount is default value of metadata-prefetch i.e. if not set by user; current it is MetadataPrefetchOnMountDisabled.
DefaultMetadataPrefetchOnMount = MetadataPrefetchOnMountDisabled
// ExperimentalMetadataPrefetchOnMountDisabled is the mode without metadata-prefetch.
ExperimentalMetadataPrefetchOnMountDisabled string = "disabled"
// ExperimentalMetadataPrefetchOnMountSynchronous is the prefetch-mode where mounting is not marked complete until prefetch is complete.
ExperimentalMetadataPrefetchOnMountSynchronous string = "sync"
// ExperimentalMetadataPrefetchOnMountAsynchronous is the prefetch-mode where mounting is marked complete once prefetch has started.
ExperimentalMetadataPrefetchOnMountAsynchronous string = "async"
// DefaultExperimentalMetadataPrefetchOnMount is default value of metadata-prefetch i.e. if not set by user; current it is ExperimentalMetadataPrefetchOnMountDisabled.
DefaultExperimentalMetadataPrefetchOnMount = ExperimentalMetadataPrefetchOnMountDisabled

DefaultKernelListCacheTtlSeconds int64 = 0
)
Expand Down

0 comments on commit 326fa8f

Please sign in to comment.